[ https://issues.apache.org/jira/browse/KAFKA-7000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505305#comment-16505305 ]
ASF GitHub Bot commented on KAFKA-7000: --------------------------------------- vvcephei closed pull request #5142: KAFKA-7000: update assignment in Consumer#position URL: https://github.com/apache/kafka/pull/5142 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 5bd6b935b39..e109f88f2a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1611,6 +1611,7 @@ public long position(TopicPartition partition, final Duration timeout) { final long timeoutMs = timeout.toMillis(); acquireAndEnsureOpen(); try { + coordinator.poll(timeout.toMillis()); if (!this.subscriptions.isAssigned(partition)) throw new IllegalStateException("You can only check the position for partitions assigned to this consumer."); Long offset = this.subscriptions.position(partition); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 372cc3ffed6..55234b17e59 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -181,7 +181,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumers += consumer0 var commitCompleted = false - var committedPosition: Long = -1 val listener = new TestConsumerReassignmentListener { override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { @@ -190,8 +189,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { // than session timeout and then try a commit. We should still be in the group, // so the commit should succeed Utils.sleep(1500) - committedPosition = consumer0.position(tp) - consumer0.commitSync(Map(tp -> new OffsetAndMetadata(committedPosition)).asJava) + consumer0.commitSync(Map(tp -> new OffsetAndMetadata(0)).asJava) commitCompleted = true } super.onPartitionsRevoked(partitions) @@ -207,7 +205,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer0.subscribe(List("otherTopic").asJava, listener) consumer0.poll(0) - assertEquals(0, committedPosition) assertTrue(commitCompleted) } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaConsumer.position should wait for assignment metadata > ---------------------------------------------------------- > > Key: KAFKA-7000 > URL: https://issues.apache.org/jira/browse/KAFKA-7000 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: John Roesler > Assignee: John Roesler > Priority: Blocker > Fix For: 2.0.0 > > > While updating Kafka Streams to stop using the deprecated > Consumer.poll(long), I found that this code unexpectedly throws an exception: > {code:java} > consumer.subscribe(topics); > // consumer.poll(0); <- I've removed this line, which shouldn't be necessary > here. > final Set<TopicPartition> partitions = new HashSet<>(); > for (final String topic : topics) { > for (final PartitionInfo partition : consumer.partitionsFor(topic)) { > partitions.add(new TopicPartition(partition.topic(), > partition.partition())); > } > } > for (final TopicPartition tp : partitions) { > final long offset = consumer.position(tp); > committedOffsets.put(tp, offset); > }{code} > Here is the exception: > {code:java} > Exception in thread "main" java.lang.IllegalStateException: You can only > check the position for partitions assigned to this consumer. > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586) > at > org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275) > at > org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148) > at > org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69){code} > > As you can see in the commented code in my snippet, we used to block for > assignment with a poll(0), which is now deprecated. > It seems reasonable to me for position() to do the same thing that poll() > does, which is call `coordinator.poll(timeout.toMillis())` early in > processing to ensure an up-to-date assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)