Github user askprasanna commented on a diff in the pull request:
https://github.com/apache/storm/pull/2156#discussion_r121342954
--- Diff:
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
---
@@ -134,110 +130,86 @@ public void
testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExcee
Time.advanceTime(50);
//No backoff for test retry service, just check that messages
will retry immediately
- for (int i = 0; i < recordsForPartition.size(); i++) {
+ for (int i = 0; i < numRecords; i++) {
spout.nextTuple();
}
ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds =
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock,
times(recordsForPartition.size())).emit(anyObject(), anyObject(),
retryMessageIds.capture());
+ verify(collectorMock, times(numRecords)).emit(anyObject(),
anyObject(), retryMessageIds.capture());
//Verify that the poll started at the earliest retriable tuple
offset
List<Long> failedOffsets = new ArrayList<>();
- for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+ for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
failedOffsets.add(msgId.offset());
}
InOrder inOrder = inOrder(consumerMock);
inOrder.verify(consumerMock).seek(partition,
failedOffsets.get(0));
inOrder.verify(consumerMock).poll(anyLong());
}
}
-
+
+ private List<ConsumerRecord<String, String>>
createRecords(TopicPartition topic, long startingOffset, int numRecords) {
+ List<ConsumerRecord<String, String>> recordsForPartition = new
ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordsForPartition.add(new ConsumerRecord(topic.topic(),
topic.partition(), startingOffset + i, "key", "value"));
+ }
+ return recordsForPartition;
+ }
+
@Test
- public void
testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples()
{
- /*
- The spout must reemit failed messages waiting for retry even if it
is not allowed to poll for new messages due to maxUncommittedOffsets being
exceeded.
- numUncommittedOffsets is equal to numNonRetriableEmittedTuples +
numRetriableTuples.
- The spout will only emit if numUncommittedOffsets -
numRetriableTuples < maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples <
maxUncommittedOffsets)
- This means that the latest offset a poll can start at for a
retriable partition,
- counting from the last committed offset, is maxUncommittedOffsets,
- where there are maxUncommittedOffsets - 1 uncommitted tuples "to
the left".
- If the retry poll starts at that offset, it at most emits the
retried tuple plus maxPollRecords - 1 new tuples.
- The limit on uncommitted offsets for one partition is therefore
maxUncommittedOffsets + maxPollRecords - 1.
-
- It is only necessary to test this for a single partition, because
partitions can't contribute negatively to numNonRetriableEmittedTuples,
- so if the limit holds for one partition, it will also hold for
each individual partition when multiple are involved.
-
- This makes the actual limit numPartitions * (maxUncommittedOffsets
+ maxPollRecords - 1)
- */
-
- //Emit maxUncommittedOffsets messages, and fail only the last.
Then ensure that the spout will allow no more than maxUncommittedOffsets +
maxPollRecords - 1 uncommitted offsets when retrying
+ public void
testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
+ //This verifies that partitions can't prevent each other from
retrying tuples due to the maxUncommittedOffsets limit.
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
-
- Map<TopicPartition, List<ConsumerRecord<String, String>>>
firstPollRecords = new HashMap<>();
- List<ConsumerRecord<String, String>>
firstPollRecordsForPartition = new ArrayList<>();
- for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets();
i++) {
- //This is cheating a bit since maxPollRecords would
normally spread this across multiple polls
- firstPollRecordsForPartition.add(new
ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
- }
- firstPollRecords.put(partition, firstPollRecordsForPartition);
-
- int maxPollRecords = 5;
- Map<TopicPartition, List<ConsumerRecord<String, String>>>
secondPollRecords = new HashMap<>();
- List<ConsumerRecord<String, String>>
secondPollRecordsForPartition = new ArrayList<>();
- for(int i = 0; i < maxPollRecords; i++) {
- secondPollRecordsForPartition.add(new
ConsumerRecord(partition.topic(), partition.partition(),
spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
- }
- secondPollRecords.put(partition,
secondPollRecordsForPartition);
+ TopicPartition partitionTwo = new
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(partition);
+ partitions.add(partitionTwo);
+ setupSpout(partitions);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>>
records = new HashMap<>();
+ //This is cheating a bit since maxPollRecords would normally
spread this across multiple polls
+ records.put(partition, createRecords(partition, 0,
spoutConfig.getMaxUncommittedOffsets()));
+ records.put(partitionTwo, createRecords(partitionTwo, 0,
spoutConfig.getMaxUncommittedOffsets()));
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPollRecords))
- .thenReturn(new ConsumerRecords(secondPollRecords));
+ .thenReturn(new ConsumerRecords(records));
- for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() +
maxPollRecords; i++) {
+ for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets()*2;
i++) {
spout.nextTuple();
}
ArgumentCaptor<KafkaSpoutMessageId> messageIds =
ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock,
times(firstPollRecordsForPartition.size())).emit(anyObject(), anyObject(),
messageIds.capture());
-
- KafkaSpoutMessageId failedMessageId =
messageIds.getAllValues().get(messageIds.getAllValues().size() - 1);
- spout.fail(failedMessageId);
-
+ verify(collectorMock,
times(spoutConfig.getMaxUncommittedOffsets()*2)).emit(anyObject(), anyObject(),
messageIds.capture());
+
+ //Now fail a tuple on partition 0 and verify that it is
allowed to retry
+ //Partition 1 should be paused, since it is at the uncommitted
offsets limit
+ Optional<KafkaSpoutMessageId> failedMessageId =
messageIds.getAllValues().stream()
+ .filter(messageId -> messageId.partition() ==
partition.partition())
+ .findAny();
+
+ spout.fail(failedMessageId.get());
+
reset(collectorMock);
-
- //Now make the single failed tuple retriable
+
Time.advanceTime(50);
- //The spout should allow another poll since there are now only
maxUncommittedOffsets - 1 nonretriable tuples
- for (int i = 0; i < firstPollRecordsForPartition.size() +
maxPollRecords; i++) {
- spout.nextTuple();
- }
-
- ArgumentCaptor<KafkaSpoutMessageId> retryBatchMessageIdsCaptor
= ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock, times(maxPollRecords)).emit(anyObject(),
anyObject(), retryBatchMessageIdsCaptor.capture());
- reset(collectorMock);
- //Check that the consumer started polling at the failed tuple
offset
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new
ConsumerRecords(Collections.singletonMap(partition, createRecords(partition,
failedMessageId.get().offset(), 1))));
--- End diff --
return generic ConsumerRecords to avoid warning around raw type
In this case I am getting an incompatible bounds error on the V param to
singletonMap()
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---