STORM-2607: Switch OffsetManager to track earliest uncommitted offset instead of last committed offset for compatibility with commitSync consumer API
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d7bdc2d3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d7bdc2d3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d7bdc2d3 Branch: refs/heads/master Commit: d7bdc2d34b1a09a14c76381284edaf6236096457 Parents: 14e98e7 Author: Stig Rohde Døssing <[email protected]> Authored: Sat Jul 1 17:29:56 2017 +0200 Committer: Stig Rohde Døssing <[email protected]> Committed: Wed Oct 11 19:27:22 2017 +0200 ---------------------------------------------------------------------- .../apache/storm/kafka/spout/KafkaSpout.java | 6 +- .../kafka/spout/internal/OffsetManager.java | 88 ++++++------ .../storm/kafka/spout/KafkaSpoutCommitTest.java | 9 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 21 +-- .../spout/KafkaSpoutMessagingGuaranteeTest.java | 8 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 15 ++- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 10 +- .../kafka/spout/MaxUncommittedOffsetTest.java | 8 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 40 +++--- .../spout/SingleTopicKafkaUnitSetupHelper.java | 2 +- .../SpoutWithMockedConsumerSetupHelper.java | 2 +- .../kafka/spout/internal/OffsetManagerTest.java | 135 +++++++++++++++---- 12 files changed, 226 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 2428cd0..9253a2d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -197,7 +197,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } /** - * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. + * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset. */ private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { if (committedOffset != null) { // offset was committed for this TopicPartition @@ -206,8 +206,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout { } else if (firstPollOffsetStrategy.equals(LATEST)) { kafkaConsumer.seekToEnd(Collections.singleton(tp)); } else { - // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. - kafkaConsumer.seek(tp, committedOffset.offset() + 1); + // By default polling starts at the last committed offset, i.e. the first offset that was not marked as processed. + kafkaConsumer.seek(tp, committedOffset.offset()); } } else { // no commits have ever been done, so start at the beginning or end depending on the strategy if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java index 8d6fbce..56f6701 100755 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java @@ -16,6 +16,7 @@ package org.apache.storm.kafka.spout.internal; +import com.google.common.annotations.VisibleForTesting; import java.util.Comparator; import java.util.Iterator; import java.util.NavigableSet; @@ -38,7 +39,7 @@ public class OffsetManager { /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ private final long initialFetchOffset; - // Last offset committed to Kafka. Initially it is set to fetchOffset - 1 + // Committed offset, i.e. the offset where processing will resume if the spout restarts. Initially it is set to fetchOffset. private long committedOffset; // Emitted Offsets List private final NavigableSet<Long> emittedOffsets = new TreeSet<>(); @@ -53,8 +54,8 @@ public class OffsetManager { public OffsetManager(TopicPartition tp, long initialFetchOffset) { this.tp = tp; this.initialFetchOffset = initialFetchOffset; - this.committedOffset = initialFetchOffset - 1; - LOG.debug("Instantiated {}", this); + this.committedOffset = initialFetchOffset; + LOG.debug("Instantiated {}", this.toString()); } public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N) @@ -66,9 +67,11 @@ public class OffsetManager { } /** - * An offset is only committed when all records with lower offset have been + * An offset can only be committed when all emitted records with lower offset have been * acked. This guarantees that all offsets smaller than the committedOffset - * have been delivered. + * have been delivered, or that those offsets no longer exist in Kafka. + * <p/> + * The returned offset points to the earliest uncommitted offset, and matches the semantics of the KafkaConsumer.commitSync API. * * @return the next OffsetAndMetadata to commit, or null if no offset is * ready to commit. @@ -76,76 +79,77 @@ public class OffsetManager { public OffsetAndMetadata findNextCommitOffset() { long currOffset; long nextCommitOffset = committedOffset; - long lastOffMessageOffset = committedOffset; KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap currOffset = currAckedMsg.offset(); - if (currOffset == lastOffMessageOffset + 1) { // found the next offset to commit + if (currOffset == nextCommitOffset) { // found the next offset to commit nextCommitMsg = currAckedMsg; - lastOffMessageOffset = currOffset; - nextCommitOffset = lastOffMessageOffset + 1; - } else if (currOffset > lastOffMessageOffset + 1) { - if (emittedOffsets.contains(lastOffMessageOffset + 1)) { - LOG.debug("topic-partition [{}] has non-continuous offset [{}]." + nextCommitOffset = currOffset + 1; + } else if (currOffset > nextCommitOffset) { + if (emittedOffsets.contains(nextCommitOffset)) { + LOG.debug("topic-partition [{}] has non-sequential offset [{}]." + " It will be processed in a subsequent batch.", tp, currOffset); break; } else { /* - This case will arise in case of non contiguous offset being processed. - So, if the topic doesn't contain offset = committedOffset + 1 (possible + This case will arise in case of non-sequential offset being processed. + So, if the topic doesn't contain offset = nextCommitOffset (possible if the topic is compacted or deleted), the consumer should jump to the next logical point in the topic. Next logical offset should be the - first element after committedOffset in the ascending ordered emitted set. + first element after nextCommitOffsset in the ascending ordered emitted set. */ - LOG.debug("Processed non contiguous offset." - + " (committedOffset+1) is no longer part of the topic." - + " Committed: [{}], Processed: [{}]", committedOffset, currOffset); - final Long nextEmittedOffset = emittedOffsets.ceiling(lastOffMessageOffset); + LOG.debug("Processed non-sequential offset." + + " The earliest uncommitted offset is no longer part of the topic." + + " Missing offset: [{}], Processed: [{}]", nextCommitOffset, currOffset); + final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset); if (nextEmittedOffset != null && currOffset == nextEmittedOffset) { + LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset", + currOffset, nextCommitOffset); nextCommitMsg = currAckedMsg; - lastOffMessageOffset = currOffset; - nextCommitOffset = lastOffMessageOffset + 1; + nextCommitOffset = currOffset + 1; } else { - LOG.debug("topic-partition [{}] has non-continuous offset [{}]." - + " Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset); + LOG.debug("Topic-partition [{}] has non-sequential offset [{}]." + + " Next offset to commit should be [{}]", tp, currOffset, nextCommitOffset); break; } } } else { - throw new IllegalStateException("The offset [" + currOffset + "] is below the current committed " - + "offset [" + committedOffset + "] for [" + tp + "]." + throw new IllegalStateException("The offset [" + currOffset + "] is below the current nextCommitOffset " + + "[" + nextCommitOffset + "] for [" + tp + "]." + " This should not be possible, and likely indicates a bug in the spout's acking or emit logic."); } } OffsetAndMetadata nextCommitOffsetAndMetadata = null; if (nextCommitMsg != null) { - nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); - LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed", - tp, earliestUncommittedOffset, nextCommitOffsetAndMetadata.offset() - 1); + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, + nextCommitMsg.getMetadata(Thread.currentThread())); + LOG.debug("Topic-partition [{}] has offsets [{}-{}] ready to be committed." + + " Processing will resume at [{}] if the spout restarts", + tp, committedOffset, nextCommitOffsetAndMetadata.offset() - 1, nextCommitOffsetAndMetadata.offset()); } else { - LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp); + LOG.debug("Topic-partition [{}] has no offsets ready to be committed", tp); } LOG.trace("{}", this); return nextCommitOffsetAndMetadata; } /** - * Marks an offset has committed. This method has side effects - it sets the + * Marks an offset as committed. This method has side effects - it sets the * internal state in such a way that future calls to - * {@link #findNextCommitOffset()} will return offsets greater than the + * {@link #findNextCommitOffset()} will return offsets greater than or equal to the * offset specified, if any. * - * @param committedOffset offset to be marked as committed + * @param committedOffset The committed offset. All lower offsets are expected to have been committed. * @return Number of offsets committed in this commit */ public long commit(OffsetAndMetadata committedOffset) { - final long preCommitCommittedOffsets = this.committedOffset; - final long numCommittedOffsets = committedOffset.offset() - this.committedOffset - 1; + final long preCommitCommittedOffset = this.committedOffset; + long numCommittedOffsets = 0; this.committedOffset = committedOffset.offset(); for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) { - if (iterator.next().offset() <= committedOffset.offset()) { + if (iterator.next().offset() < committedOffset.offset()) { iterator.remove(); numCommittedOffsets++; } else { @@ -154,7 +158,7 @@ public class OffsetManager { } for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) { - if (iterator.next() <= committedOffset.offset()) { + if (iterator.next() < committedOffset.offset()) { iterator.remove(); } else { break; @@ -163,8 +167,9 @@ public class OffsetManager { LOG.trace("{}", this); - LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}].", - numCommittedOffsets, preCommitCommittedOffsets + 1, this.committedOffset, tp); + LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}]." + + " Processing will resume at offset [{}] if the spout restarts.", + numCommittedOffsets, preCommitCommittedOffset, this.committedOffset - 1, tp, this.committedOffset); return numCommittedOffsets; } @@ -184,9 +189,14 @@ public class OffsetManager { public boolean contains(KafkaSpoutMessageId msgId) { return ackedMsgs.contains(msgId); } + + @VisibleForTesting + boolean containsEmitted(long offset) { + return emittedOffsets.contains(offset); + } @Override - public String toString() { + public final String toString() { return "OffsetManager{" + "topic-partition=" + tp + ", fetchOffset=" + initialFetchOffset http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java index a356cb5..0714d37 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java @@ -39,9 +39,10 @@ import org.mockito.InOrder; import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -94,7 +95,7 @@ public class KafkaSpoutCommitTest { } ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture()); + verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.ack(messageId); @@ -110,10 +111,10 @@ public class KafkaSpoutCommitTest { inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 9 was last committed offset + //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue(); assertTrue(commits.containsKey(partition)); - assertEquals(9, commits.get(partition).offset()); + assertEquals(10, commits.get(partition).offset()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java index 756ea1a..74341da 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java @@ -15,8 +15,6 @@ */ package org.apache.storm.kafka.spout; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -45,6 +43,11 @@ import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.anyString; public class KafkaSpoutEmitTest { @@ -81,7 +84,7 @@ public class KafkaSpoutEmitTest { spout.nextTuple(); - verify(collectorMock, times(1)).emit(anyObject(), anyObject(), anyObject()); + verify(collectorMock, times(1)).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); } @Test @@ -107,7 +110,7 @@ public class KafkaSpoutEmitTest { } ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture()); + verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.fail(messageId); @@ -122,7 +125,7 @@ public class KafkaSpoutEmitTest { } ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), retryMessageIds.capture()); + verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), retryMessageIds.capture()); //Verify that the poll started at the earliest retriable tuple offset List<Long> failedOffsets = new ArrayList<>(); @@ -182,7 +185,7 @@ public class KafkaSpoutEmitTest { } ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(firstPollRecordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture()); + verify(collectorMock, times(firstPollRecordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); KafkaSpoutMessageId failedMessageId = messageIds.getAllValues().get(messageIds.getAllValues().size() - 1); spout.fail(failedMessageId); @@ -197,7 +200,7 @@ public class KafkaSpoutEmitTest { } ArgumentCaptor<KafkaSpoutMessageId> retryBatchMessageIdsCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(maxPollRecords)).emit(anyObject(), anyObject(), retryBatchMessageIdsCaptor.capture()); + verify(collectorMock, times(maxPollRecords)).emit(anyString(), anyList(), retryBatchMessageIdsCaptor.capture()); reset(collectorMock); //Check that the consumer started polling at the failed tuple offset @@ -215,12 +218,12 @@ public class KafkaSpoutEmitTest { for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) { spout.nextTuple(); } - verify(collectorMock, never()).emit(anyObject(), anyObject(), anyObject()); + verify(collectorMock, never()).emit(any(), any(), any()); //Fail the last tuple, which brings the number of nonretriable tuples back under the limit, and check that the spout polls again spout.fail(firstTupleFromRetryBatch); spout.nextTuple(); - verify(collectorMock, times(1)).emit(anyObject(), anyObject(), anyObject()); + verify(collectorMock, times(1)).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class)); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java index 1d96232..cc24261 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java @@ -20,10 +20,10 @@ import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutC import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyList; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java index c90bd0a..830ac28 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java @@ -18,9 +18,6 @@ package org.apache.storm.kafka.spout; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.hasKey; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -53,7 +50,11 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import java.util.HashSet; import java.util.Set; @@ -106,11 +107,11 @@ public class KafkaSpoutRebalanceTest { //Emit the messages spout.nextTuple(); ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture()); + verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture()); reset(collectorMock); spout.nextTuple(); ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture()); + verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture()); //Now rebalance consumerRebalanceListener.onPartitionsRevoked(assignedPartitions); @@ -241,6 +242,6 @@ public class KafkaSpoutRebalanceTest { //This partition was previously assigned, so the consumer position shouldn't change verify(consumerMock, never()).seek(eq(assignedPartition), anyLong()); //This partition is new, and should start at the committed offset - verify(consumerMock).seek(newPartition, committedOffset + 1); + verify(consumerMock).seek(newPartition, committedOffset); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java index ec557e7..6df6bc4 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java @@ -17,7 +17,6 @@ package org.apache.storm.kafka.spout; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -45,6 +44,9 @@ import org.mockito.InOrder; import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; public class KafkaSpoutRetryLimitTest { @@ -94,7 +96,7 @@ public class KafkaSpoutRetryLimitTest { } ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture()); + verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture()); for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) { spout.fail(messageId); @@ -108,9 +110,9 @@ public class KafkaSpoutRetryLimitTest { inOrder.verify(consumerMock).commitSync(commitCapture.capture()); inOrder.verify(consumerMock).poll(anyLong()); - //verify that Offset 3 was committed for the given TopicPartition + //verify that offset 4 was committed for the given TopicPartition, since processing should resume at 4. assertTrue(commitCapture.getValue().containsKey(partition)); - assertEquals(lastOffset, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); + assertEquals(lastOffset + 1, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java index 3afb498..84d8f23 100755 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java @@ -21,7 +21,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -44,6 +43,9 @@ import org.mockito.ArgumentCaptor; import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.ArgumentMatchers.anyString; public class MaxUncommittedOffsetTest { @@ -212,8 +214,8 @@ public class MaxUncommittedOffsetTest { } ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); verify(collector, times(allowedPolls*maxPollRecords)).emit( - anyObject(), - anyObject(), + anyString(), + anyList(), thirdRunMessageIds.capture()); reset(collector); http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index b4137dc..66216dd 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -19,8 +19,6 @@ package org.apache.storm.kafka.spout; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -53,8 +51,10 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder; -import static org.mockito.Matchers.anyList; -import static org.mockito.Matchers.anyString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.hamcrest.Matchers; @@ -118,7 +118,7 @@ public class SingleTopicKafkaSpoutTest { spout.nextTuple(); } ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector, times(messageCount)).emit(anyObject(), anyObject(), messageIdCaptor.capture()); + verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture()); List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues(); for (int i = 1; i < messageIds.size(); i++) { spout.ack(messageIds.get(i)); @@ -130,7 +130,7 @@ public class SingleTopicKafkaSpoutTest { reset(collector); spout.nextTuple(); ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector).emit(anyObject(), anyObject(), failedIdReplayCaptor.capture()); + verify(collector).emit(anyString(), anyList(), failedIdReplayCaptor.capture()); assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple)); @@ -145,7 +145,7 @@ public class SingleTopicKafkaSpoutTest { Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue(); TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0); assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp)); - assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount - 1)); + assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount)); /* Verify that the following acked (now committed) tuples are not emitted again * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened, @@ -156,7 +156,7 @@ public class SingleTopicKafkaSpoutTest { for(int i = 0; i < 3; i++) { spout.nextTuple(); } - verify(collector, never()).emit(anyString(), anyList(), anyObject()); + verify(collector, never()).emit(any(), any(), any()); } } @@ -169,7 +169,7 @@ public class SingleTopicKafkaSpoutTest { //play 1st tuple ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture()); + verify(collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture()); spout.ack(messageIdToDoubleAck.getValue()); //Emit some more messages @@ -187,7 +187,7 @@ public class SingleTopicKafkaSpoutTest { //Verify that all messages are emitted, ack all the messages ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class); verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), - anyObject(), + anyList(), messageIds.capture()); messageIds.getAllValues().iterator().forEachRemaining(spout::ack); @@ -236,14 +236,14 @@ public class SingleTopicKafkaSpoutTest { //play and ack 1 tuple ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyObject(), anyObject(), messageIdAcked.capture()); + verify(collector).emit(anyString(), anyList(), messageIdAcked.capture()); spout.ack(messageIdAcked.getValue()); reset(collector); //play and fail 1 tuple ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture()); + verify(collector).emit(anyString(), anyList(), messageIdFailed.capture()); spout.fail(messageIdFailed.getValue()); reset(collector); @@ -256,7 +256,7 @@ public class SingleTopicKafkaSpoutTest { //All messages except the first acked message should have been emitted verify(collector, times(messageCount - 1)).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), - anyObject(), + anyList(), remainingMessageIds.capture()); remainingMessageIds.getAllValues().iterator().forEachRemaining(spout::ack); @@ -277,13 +277,13 @@ public class SingleTopicKafkaSpoutTest { //play 1st tuple ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyObject(), anyObject(), messageIdToFail.capture()); + verify(collector).emit(anyString(), anyList(), messageIdToFail.capture()); reset(collector); //play 2nd tuple ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class); spout.nextTuple(); - verify(collector).emit(anyObject(), anyObject(), messageIdToAck.capture()); + verify(collector).emit(anyString(), anyList(), messageIdToAck.capture()); reset(collector); //ack 2nd tuple @@ -300,7 +300,7 @@ public class SingleTopicKafkaSpoutTest { //All messages except the first acked message should have been emitted verify(collector, times(messageCount - 1)).emit( eq(SingleTopicKafkaSpoutConfiguration.STREAM), - anyObject(), + anyList(), remainingIds.capture()); remainingIds.getAllValues().iterator().forEachRemaining(spout::ack); @@ -324,7 +324,7 @@ public class SingleTopicKafkaSpoutTest { spout.nextTuple(); } ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); - verify(collector, times(messageCount)).emit(anyObject(), anyObject(), messageIds.capture()); + verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIds.capture()); reset(collector); //Fail tuple 5 and 3, call nextTuple, then fail tuple 2 List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues(); @@ -338,7 +338,7 @@ public class SingleTopicKafkaSpoutTest { for (int i = 0; i < messageCount; i++) { spout.nextTuple(); } - verify(collector, times(3)).emit(anyObject(), anyObject(), reemittedMessageIds.capture()); + verify(collector, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture()); Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>(); expectedReemitIds.add(capturedMessageIds.get(5)); expectedReemitIds.add(capturedMessageIds.get(3)); @@ -356,7 +356,7 @@ public class SingleTopicKafkaSpoutTest { for (int i = 0; i <= maxRetries; i++) { ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class); spout.nextTuple(); - verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture()); + verify(collector).emit(anyString(), anyList(), messageIdFailed.capture()); KafkaSpoutMessageId msgId = messageIdFailed.getValue(); spout.fail(msgId); assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1)); @@ -365,6 +365,6 @@ public class SingleTopicKafkaSpoutTest { //Verify that the tuple is not emitted again spout.nextTuple(); - verify(collector, never()).emit(anyObject(), anyObject(), anyObject()); + verify(collector, never()).emit(any(), any(), any()); } } http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java index 1c2158f..4255e31 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java @@ -16,7 +16,7 @@ package org.apache.storm.kafka.spout; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java index a5d3c54..f43616d 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java @@ -16,7 +16,7 @@ package org.apache.storm.kafka.spout; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; http://git-wip-us.apache.org/repos/asf/storm/blob/d7bdc2d3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java index e8896c9..abbacf9 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java @@ -21,6 +21,8 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; import java.util.NoSuchElementException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.storm.kafka.spout.KafkaSpoutMessageId; import org.junit.Rule; @@ -31,54 +33,141 @@ public class OffsetManagerTest { @Rule public ExpectedException expect = ExpectedException.none(); + + private final long initialFetchOffset = 0; + private final TopicPartition testTp = new TopicPartition("testTopic", 0); + private final OffsetManager manager = new OffsetManager(testTp, initialFetchOffset); @Test public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapInMiddleOfAcked() { - /*If topic compaction is enabled in Kafka, we sometimes need to commit past a gap of deleted offsets + /* If topic compaction is enabled in Kafka, we sometimes need to commit past a gap of deleted offsets * Since the Kafka consumer should return offsets in order, we can assume that if a message is acked * then any prior message will have been emitted at least once. * If we see an acked message and some of the offsets preceding it were not emitted, they must have been compacted away and should be skipped. */ - - TopicPartition tp = new TopicPartition("test", 0); - OffsetManager manager = new OffsetManager(tp, 0); - manager.addToEmitMsgs(0); manager.addToEmitMsgs(1); manager.addToEmitMsgs(2); //3, 4 compacted away - manager.addToEmitMsgs(5); - manager.addToEmitMsgs(6); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 0)); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 1)); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 2)); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + manager.addToEmitMsgs(initialFetchOffset + 5); + manager.addToEmitMsgs(initialFetchOffset + 6); + manager.addToAckMsgs(getMessageId(initialFetchOffset)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 1)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 2)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); - assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(2L)); + assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(initialFetchOffset + 3)); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", - manager.findNextCommitOffset().offset(), is(6L)); + manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7)); } @Test public void testSkipMissingOffsetsWhenFindingNextCommitOffsetWithGapBeforeAcked() { - - TopicPartition tp = new TopicPartition("test", 0); - OffsetManager manager = new OffsetManager(tp, 0); - //0-4 compacted away - manager.addToEmitMsgs(5); - manager.addToEmitMsgs(6); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 6)); + manager.addToEmitMsgs(initialFetchOffset + 5); + manager.addToEmitMsgs(initialFetchOffset + 6); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 6)); assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(), is(nullValue())); - manager.addToAckMsgs(new KafkaSpoutMessageId(tp, 5)); + manager.addToAckMsgs(getMessageId(initialFetchOffset + 5)); assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", - manager.findNextCommitOffset().offset(), is(6L)); + manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7)); + } + + @Test + public void testFindNextCommittedOffsetWithNoAcks() { + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue())); + } + + @Test + public void testFindNextCommitOffsetWithOneAck() { + /* + * The KafkaConsumer commitSync API docs: "The committed offset should be the next message your application will consume, i.e. + * lastProcessedMessageOffset + 1. " + */ + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() { + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2)); + } + + @Test + public void testFindNextCommitOffsetWithAckedOffsetGap() { + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover the sequential acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextOffsetWithAckedButNotEmittedOffsetGap() { + /** + * If topic compaction is enabled in Kafka some offsets may be deleted. + * We distinguish this case from regular gaps in the acked offset sequence caused by out of order acking + * by checking that offsets in the gap have been emitted at some point previously. + * If they haven't then they can't exist in Kafka, since the spout emits tuples in order. + */ + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist", + nextCommitOffset.offset(), is(initialFetchOffset + 3)); + } + + @Test + public void testFindNextCommitOffsetWithUnackedOffsetGap() { + manager.addToEmitMsgs(initialFetchOffset + 1); + emitAndAckMessage(getMessageId(initialFetchOffset)); + OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(); + assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1)); + } + + @Test + public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() { + OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10); + emitAndAckMessage(getMessageId(0)); + OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(); + assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue())); + } + + @Test + public void testCommit() { + emitAndAckMessage(getMessageId(initialFetchOffset)); + emitAndAckMessage(getMessageId(initialFetchOffset + 1)); + emitAndAckMessage(getMessageId(initialFetchOffset + 2)); + + long committedMessages = manager.commit(new OffsetAndMetadata(initialFetchOffset + 2)); + + assertThat("Should have committed all messages to the left of the earliest uncommitted offset", committedMessages, is(2L)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset), is(false)); + assertThat("The committed messages should not be in the acked list anymore", manager.contains(getMessageId(initialFetchOffset + 1)), is(false)); + assertThat("The committed messages should not be in the emitted list anymore", manager.containsEmitted(initialFetchOffset + 1), is(false)); + assertThat("The uncommitted message should still be in the acked list", manager.contains(getMessageId(initialFetchOffset + 2)), is(true)); + assertThat("The uncommitted message should still be in the emitted list", manager.containsEmitted(initialFetchOffset + 2), is(true)); + } + + private KafkaSpoutMessageId getMessageId(long offset) { + return new KafkaSpoutMessageId(testTp, offset); + } + + private void emitAndAckMessage(KafkaSpoutMessageId msgId) { + manager.addToEmitMsgs(msgId.offset()); + manager.addToAckMsgs(msgId); } }
