Repository: kafka Updated Branches: refs/heads/0.10.1 027455146 -> c310a1bc6
KAFKA-4547; Avoid unnecessary offset commit that could lead to an invalid offset position if partition is paused Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #2431 from vahidhashemian/KAFKA-4547-0.10.1 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c310a1bc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c310a1bc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c310a1bc Branch: refs/heads/0.10.1 Commit: c310a1bc658bf4cea146e620f988ff19589bca14 Parents: 0274551 Author: Vahid Hashemian <vahidhashem...@us.ibm.com> Authored: Tue Jan 24 15:30:07 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Jan 24 15:30:07 2017 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 6 +- .../clients/consumer/internals/Fetcher.java | 2 +- .../consumer/internals/SubscriptionState.java | 14 ++++- .../clients/consumer/KafkaConsumerTest.java | 55 +++++++++++++++++ .../clients/consumer/internals/FetcherTest.java | 62 ++++++++++++++++++++ 5 files changed, 132 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b384211..d309111 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1523,9 +1523,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { // the user is manually assigning partitions and managing their own offsets). fetcher.resetOffsetsIfNeeded(partitions); - if (!subscriptions.hasAllFetchPositions()) { - // if we still don't have offsets for all partitions, then we should either seek - // to the last committed position or reset using the auto reset policy + if (!subscriptions.hasAllFetchPositions(partitions)) { + // if we still don't have offsets for the given partitions, then we should either + // seek to the last committed position or reset using the auto reset policy // first refresh commits for all assigned partitions coordinator.refreshCommittedOffsetsIfNeeded(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index fdcfc30..3b8a81c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -206,7 +206,7 @@ public class Fetcher<K, V> { public void updateFetchPositions(Set<TopicPartition> partitions) { // reset the fetch position to the committed position for (TopicPartition tp : partitions) { - if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp)) + if (!subscriptions.isAssigned(tp) || subscriptions.hasValidPosition(tp)) continue; if (subscriptions.isOffsetResetNeeded(tp)) { http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 6dc2060..12830ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -341,13 +341,17 @@ public class SubscriptionState { return assignedState(partition).resetStrategy; } - public boolean hasAllFetchPositions() { - for (TopicPartitionState state : assignment.partitionStateValues()) - if (!state.hasValidPosition()) + public boolean hasAllFetchPositions(Collection<TopicPartition> partitions) { + for (TopicPartition partition : partitions) + if (!hasValidPosition(partition)) return false; return true; } + public boolean hasAllFetchPositions() { + return hasAllFetchPositions(this.assignedPartitions()); + } + public Set<TopicPartition> missingFetchPositions() { Set<TopicPartition> missing = new HashSet<>(); for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates()) { @@ -369,6 +373,10 @@ public class SubscriptionState { return isAssigned(tp) && assignedState(tp).isFetchable(); } + public boolean hasValidPosition(TopicPartition tp) { + return isAssigned(tp) && assignedState(tp).hasValidPosition(); + } + public void pause(TopicPartition tp) { assignedState(tp).pause(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index bf45ee6..05d48c5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -56,6 +56,7 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.MockConsumerInterceptor; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.TestUtils; @@ -1035,6 +1036,60 @@ public class KafkaConsumerTest { consumer.close(); } + @Test + public void testOffsetOfPausedPartitions() { + int rebalanceTimeoutMs = 60000; + int sessionTimeoutMs = 30000; + int heartbeatIntervalMs = 3000; + int autoCommitIntervalMs = 1000; + + Time time = new MockTime(); + Cluster cluster = TestUtils.singletonCluster(topic, 2); + Node node = cluster.nodes().get(0); + + Metadata metadata = new Metadata(0, Long.MAX_VALUE); + metadata.update(cluster, time.milliseconds()); + + MockClient client = new MockClient(time, metadata); + client.setNode(node); + PartitionAssignor assignor = new RangeAssignor(); + + final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, + rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs); + + // lookup coordinator + client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node); + Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); + + // manual assignment + Set<TopicPartition> partitions = Utils.mkSet(tp0, tp1); + consumer.assign(partitions); + // verify consumer's assignment + assertTrue(consumer.assignment().equals(partitions)); + + consumer.pause(partitions); + consumer.seekToEnd(partitions); + + // fetch and verify committed offset of two partitions + Map<TopicPartition, Long> offsets = new HashMap<>(); + offsets.put(tp0, 0L); + offsets.put(tp1, 0L); + + client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE.code()), coordinator); + assertEquals(0, consumer.committed(tp0).offset()); + assertEquals(0, consumer.committed(tp1).offset()); + + // fetch and verify consumer's position in the two partitions + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 3L), Errors.NONE.code())); + client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp1, 3L), Errors.NONE.code())); + assertEquals(3L, consumer.position(tp0)); + assertEquals(3L, consumer.position(tp1)); + + client.requests().clear(); + consumer.unsubscribe(); + consumer.close(); + } + @Test(expected = IllegalStateException.class) public void testPollWithNoSubscription() { KafkaConsumer<byte[], byte[]> consumer = newConsumer(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 5822646..3e3a0e1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -540,6 +540,68 @@ public class FetcherTest { } @Test + public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() { + subscriptions.assignFromUser(singleton(tp)); + subscriptions.committed(tp, new OffsetAndMetadata(0)); + subscriptions.pause(tp); // paused partition does not have a valid position + subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); + + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), + listOffsetResponse(Errors.NONE, 1L, 10L)); + fetcher.updateFetchPositions(singleton(tp)); + + assertFalse(subscriptions.isOffsetResetNeeded(tp)); + assertFalse(subscriptions.isFetchable(tp)); // because tp is paused + assertTrue(subscriptions.hasValidPosition(tp)); + assertEquals(10, subscriptions.position(tp).longValue()); + } + + @Test + public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() { + subscriptions.assignFromUser(singleton(tp)); + subscriptions.pause(tp); // paused partition does not have a valid position + + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), + listOffsetResponse(Errors.NONE, 1L, 0L)); + fetcher.updateFetchPositions(singleton(tp)); + + assertFalse(subscriptions.isOffsetResetNeeded(tp)); + assertFalse(subscriptions.isFetchable(tp)); // because tp is paused + assertTrue(subscriptions.hasValidPosition(tp)); + assertEquals(0, subscriptions.position(tp).longValue()); + } + + @Test + public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() { + subscriptions.assignFromUser(singleton(tp)); + subscriptions.committed(tp, new OffsetAndMetadata(0)); + subscriptions.pause(tp); // paused partition does not have a valid position + subscriptions.seek(tp, 10); + + fetcher.updateFetchPositions(singleton(tp)); + + assertFalse(subscriptions.isOffsetResetNeeded(tp)); + assertFalse(subscriptions.isFetchable(tp)); // because tp is paused + assertTrue(subscriptions.hasValidPosition(tp)); + assertEquals(10, subscriptions.position(tp).longValue()); + } + + @Test + public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() { + subscriptions.assignFromUser(singleton(tp)); + subscriptions.committed(tp, new OffsetAndMetadata(0)); + subscriptions.seek(tp, 10); + subscriptions.pause(tp); // paused partition already has a valid position + + fetcher.updateFetchPositions(singleton(tp)); + + assertFalse(subscriptions.isOffsetResetNeeded(tp)); + assertFalse(subscriptions.isFetchable(tp)); // because tp is paused + assertTrue(subscriptions.hasValidPosition(tp)); + assertEquals(10, subscriptions.position(tp).longValue()); + } + + @Test public void testGetAllTopics() { // sending response before request, as getTopicMetadata is a blocking call client.prepareResponse(newMetadataResponse(topicName, Errors.NONE).toStruct());