Repository: kafka Updated Branches: refs/heads/trunk 90dc2d16f -> 4d89db968
KAFKA-5273: Make KafkaConsumer.committed query the server for all partitions Before this patch the consumer would return the cached offsets for partitions in its current assignment. This worked when all the offset commits went through the consumer. With KIP-98, offsets can be committed transactionally through the producer. This means that relying on cached positions in the consumer returns incorrect information: since commits go through the producer, the cache is never updated. Hence we need to update the `KafkaConsumer.committed` method to always lookup the server for the last committed offset to ensure it gets the correct information every time. Author: Apurva Mehta <[email protected]> Reviewers: Jason Gustafson, Guozhang Wang Closes #3119 from apurvam/KAFKA-5273-kafkaconsumer-committed-should-always-hit-server Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4d89db96 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4d89db96 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4d89db96 Branch: refs/heads/trunk Commit: 4d89db968257c0c3273ece7e5546cb95ddcbeb46 Parents: 90dc2d1 Author: Apurva Mehta <[email protected]> Authored: Tue May 23 23:08:57 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue May 23 23:08:57 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 18 +++--------------- .../kafka/clients/consumer/OffsetAndMetadata.java | 14 ++++++++++---- .../kafka/clients/consumer/KafkaConsumerTest.java | 10 ++++++++-- .../kafka/api/TransactionsBounceTest.scala | 5 ++--- .../integration/kafka/api/TransactionsTest.scala | 8 +++----- .../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++++++++ 6 files changed, 36 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4d89db96/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6bcc086..3c6d409 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1292,8 +1292,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * Get the last committed offset for the given partition (whether the commit happened by this process or * another). This offset will be used as the position for the consumer in the event of a failure. * <p> - * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the - * consumer hasn't yet initialized its cache of committed offsets. + * This call will block to do a remote call to get the latest committed offsets from the server. * * @param partition The partition to check * @return The last committed offset and metadata or null if there was no prior commit @@ -1309,19 +1308,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public OffsetAndMetadata committed(TopicPartition partition) { acquire(); try { - OffsetAndMetadata committed; - if (subscriptions.isAssigned(partition)) { - committed = this.subscriptions.committed(partition); - if (committed == null) { - coordinator.refreshCommittedOffsetsIfNeeded(); - committed = this.subscriptions.committed(partition); - } - } else { - Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition)); - committed = offsets.get(partition); - } - - return committed; + Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition)); + return offsets.get(partition); } finally { release(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/4d89db96/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java index 9d06f29..262d8f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.common.requests.OffsetFetchResponse; + import java.io.Serializable; /** @@ -34,7 +36,12 @@ public class OffsetAndMetadata implements Serializable { */ public OffsetAndMetadata(long offset, String metadata) { this.offset = offset; - this.metadata = metadata; + // The server converts null metadata to an empty string. So we store it as an empty string as well on the client + // to be consistent. + if (metadata == null) + this.metadata = OffsetFetchResponse.NO_METADATA; + else + this.metadata = metadata; } /** @@ -62,14 +69,13 @@ public class OffsetAndMetadata implements Serializable { OffsetAndMetadata that = (OffsetAndMetadata) o; if (offset != that.offset) return false; - return metadata == null ? that.metadata == null : metadata.equals(that.metadata); - + return metadata.equals(that.metadata); } @Override public int hashCode() { int result = (int) (offset ^ (offset >>> 32)); - result = 31 * result + (metadata != null ? metadata.hashCode() : 0); + result = 31 * result + metadata.hashCode(); return result; } http://git-wip-us.apache.org/repos/asf/kafka/blob/4d89db96/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 5928a28..9117f71 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -510,10 +510,12 @@ public class KafkaConsumerTest { // fetch offset for two topics Map<TopicPartition, Long> offsets = new HashMap<>(); offsets.put(tp0, offset1); - offsets.put(tp1, offset2); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(offset1, consumer.committed(tp0).offset()); + + offsets.remove(tp0); + offsets.put(tp1, offset2); + client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); assertEquals(offset2, consumer.committed(tp1).offset()); } @@ -1149,6 +1151,10 @@ public class KafkaConsumerTest { client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); assertEquals(0, consumer.committed(tp0).offset()); + + offsets.remove(tp0); + offsets.put(tp1, 0L); + client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); assertEquals(0, consumer.committed(tp1).offset()); // fetch and verify consumer's position in the two partitions http://git-wip-us.apache.org/repos/asf/kafka/blob/4d89db96/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala index 110e680..5d46348 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala @@ -83,7 +83,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers) - var consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic)) + val consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic)) val producer = TestUtils.createTransactionalProducer("my-test-producer-t.id", servers) val scheduler = new BounceScheduler @@ -110,8 +110,7 @@ class TransactionsBounceTest extends KafkaServerTestHarness { if (shouldAbort) { trace(s"Committed offsets. Aborting transaction of ${records.size} messages.") producer.abortTransaction() - consumer.close() - consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic)) + TestUtils.resetToCommittedPositions(consumer) } else { trace(s"Committed offsets. committing transaction of ${records.size} messages.") producer.commitTransaction() http://git-wip-us.apache.org/repos/asf/kafka/blob/4d89db96/core/src/test/scala/integration/kafka/api/TransactionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 1832dc2..ec6b3ea 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -116,7 +116,7 @@ class TransactionsTest extends KafkaServerTestHarness { val producer = TestUtils.createTransactionalProducer(transactionalId, servers) - var consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4) + val consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4) consumer.subscribe(List(topic1)) producer.initTransactions() @@ -145,10 +145,8 @@ class TransactionsTest extends KafkaServerTestHarness { producer.abortTransaction() debug(s"aborted transaction Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " + s"records written to $topic2: $recordsProcessed") - consumer.close() - consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4) - consumer.subscribe(List(topic1)) - } + TestUtils.resetToCommittedPositions(consumer) + } } } finally { producer.close() http://git-wip-us.apache.org/repos/asf/kafka/blob/4d89db96/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5097637..70c340b 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1413,6 +1413,16 @@ object TestUtils extends Logging { records } + def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) = { + consumer.assignment.foreach { case(topicPartition) => + val offset = consumer.committed(topicPartition) + if (offset != null) + consumer.seek(topicPartition, offset.offset) + else + consumer.seekToBeginning(Collections.singletonList(topicPartition)) + } + } + } class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
