[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301019#comment-16301019
 ] 

ASF GitHub Bot commented on KAFKA-4879:
---------------------------------------

hachikuji closed pull request #3637: KAFKA-4879 KafkaConsumer.position may hang 
forever when deleting a topic
URL: https://github.com/apache/kafka/pull/3637
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index b1badefd318..992b0711670 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -116,9 +116,9 @@
     public void seekToEnd(Collection<TopicPartition> partitions);
 
     /**
-     * @see KafkaConsumer#position(TopicPartition)
+     * @see KafkaConsumer#position(TopicPartition, long)
      */
-    public long position(TopicPartition partition);
+    public long position(TopicPartition partition, long timeout);
 
     /**
      * @see KafkaConsumer#committed(TopicPartition)
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3154061bf01..5e287f3b114 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1079,8 +1079,9 @@ public void assign(Collection<TopicPartition> partitions) 
{
 
         // fetch positions if we have partitions we're subscribed to that we
         // don't know the offset for
+        //TODO we need to clarify this
         if (!subscriptions.hasAllFetchPositions())
-            updateFetchPositions(this.subscriptions.missingFetchPositions());
+            updateFetchPositions(this.subscriptions.missingFetchPositions(), 
timeout);
 
         // if data is available already, return it immediately
         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = 
fetcher.fetchedRecords();
@@ -1295,6 +1296,7 @@ public void seekToEnd(Collection<TopicPartition> 
partitions) {
      * Get the offset of the <i>next record</i> that will be fetched (if a 
record with that offset exists).
      *
      * @param partition The partition to get the position for
+     * @param timeout The amount of time to wait for the offset
      * @return The offset
      * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no 
offset is currently defined for
      *             the partition
@@ -1305,8 +1307,10 @@ public void seekToEnd(Collection<TopicPartition> 
partitions) {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
      *             configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     *
+     * @throws org.apache.kafka.common.errors.TimeoutException: if the given 
partition is not available
      */
-    public long position(TopicPartition partition) {
+    public long position(TopicPartition partition, long timeout) {
         acquireAndEnsureOpen();
         try {
             if (!this.subscriptions.isAssigned(partition))
@@ -1314,7 +1318,7 @@ public long position(TopicPartition partition) {
             Long offset = this.subscriptions.position(partition);
             if (offset == null) {
                 // batch update fetch positions for any partitions without a 
valid position
-                updateFetchPositions(subscriptions.assignedPartitions());
+                updateFetchPositions(subscriptions.assignedPartitions(), 
timeout);
                 offset = this.subscriptions.position(partition);
             }
             return offset;
@@ -1645,15 +1649,18 @@ private void close(long timeoutMs, boolean 
swallowException) {
      * or reset it using the offset reset policy the user has configured.
      *
      * @param partitions The partitions that needs updating fetch positions
+     * @param timeout The amount of time to wait for the offset
      * @throws NoOffsetForPartitionException If no offset is stored for a 
given partition and no offset reset policy is
      *             defined
+     *
+     * @throws org.apache.kafka.common.errors.TimeoutException: if one of the 
given partition is not available
      */
-    private void updateFetchPositions(Set<TopicPartition> partitions) {
+    private void updateFetchPositions(Set<TopicPartition> partitions, long 
timeout) {
         // lookup any positions for partitions which are awaiting reset (which 
may be the
         // case if the user called seekToBeginning or seekToEnd. We do this 
check first to
         // avoid an unnecessary lookup of committed offsets (which typically 
occurs when
         // the user is manually assigning partitions and managing their own 
offsets).
-        fetcher.resetOffsetsIfNeeded(partitions);
+        fetcher.resetOffsetsIfNeeded(partitions, timeout);
 
         if (!subscriptions.hasAllFetchPositions(partitions)) {
             // if we still don't have offsets for the given partitions, then 
we should either
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 91cb6f1ce7a..303ca62de51 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -241,7 +241,7 @@ public synchronized OffsetAndMetadata 
committed(TopicPartition partition) {
     }
 
     @Override
-    public synchronized long position(TopicPartition partition) {
+    public synchronized long position(TopicPartition partition, long timeout) {
         ensureNotClosed();
         if (!this.subscriptions.isAssigned(partition))
             throw new IllegalArgumentException("You can only check the 
position for partitions assigned to this consumer.");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 23c59020a57..61743d0344c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -243,15 +243,18 @@ public void onFailure(RuntimeException e) {
     /**
      * Lookup and set offsets for any partitions which are awaiting an 
explicit reset.
      * @param partitions the partitions to reset
+     * @param timeout The amount of time to wait for the offset
+     *
+     * @throws org.apache.kafka.common.errors.TimeoutException: if one of the 
given partition is not available
      */
-    public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
+    public void resetOffsetsIfNeeded(Set<TopicPartition> partitions, long 
timeout) {
         final Set<TopicPartition> needsOffsetReset = new HashSet<>();
         for (TopicPartition tp : partitions) {
             if (subscriptions.isAssigned(tp) && 
subscriptions.isOffsetResetNeeded(tp))
                 needsOffsetReset.add(tp);
         }
         if (!needsOffsetReset.isEmpty()) {
-            resetOffsets(needsOffsetReset);
+            resetOffsets(needsOffsetReset, timeout);
         }
     }
 
@@ -281,7 +284,7 @@ public void updateFetchPositions(Set<TopicPartition> 
partitions) {
         }
 
         if (!needsOffsetReset.isEmpty()) {
-            resetOffsets(needsOffsetReset);
+            resetOffsets(needsOffsetReset, Long.MAX_VALUE);
         }
     }
 
@@ -400,15 +403,17 @@ else if (strategy == OffsetResetStrategy.LATEST)
      * Reset offsets for the given partition using the offset reset strategy.
      *
      * @param partitions  The partitions that need offsets reset
+     * @param timeout The amount of time to wait for the offset
      * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException 
If no offset reset strategy is defined
+     * @throws org.apache.kafka.common.errors.TimeoutException: if one of the 
given partition is not available
      */
-    private void resetOffsets(final Set<TopicPartition> partitions) {
+    private void resetOffsets(final Set<TopicPartition> partitions, long 
timeout) {
         final Map<TopicPartition, Long> offsetResets = new HashMap<>();
         final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
         for (final TopicPartition partition : partitions) {
             offsetResetStrategyTimestamp(partition, offsetResets, 
partitionsWithNoOffsets);
         }
-        final Map<TopicPartition, OffsetData> offsetsByTimes = 
retrieveOffsetsByTimes(offsetResets, Long.MAX_VALUE, false);
+        final Map<TopicPartition, OffsetData> offsetsByTimes = 
retrieveOffsetsByTimes(offsetResets, timeout, false);
         for (final TopicPartition partition : partitions) {
             final OffsetData offsetData = offsetsByTimes.get(partition);
             if (offsetData == null) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9fd7e19810e..42d0c5a2944 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -470,7 +470,7 @@ public void 
verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
 
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(5, records.count());
-        assertEquals(55L, consumer.position(tp0));
+        assertEquals(55L, consumer.position(tp0, Long.MAX_VALUE));
         consumer.close(0, TimeUnit.MILLISECONDS);
     }
 
@@ -697,7 +697,7 @@ public void testWakeupWithFetchDataAvailable() throws 
Exception {
         }
 
         // make sure the position hasn't been updated
-        assertEquals(0, consumer.position(tp0));
+        assertEquals(0, consumer.position(tp0, Long.MAX_VALUE));
 
         // the next poll should return the completed fetch
         ConsumerRecords<String, String> records = consumer.poll(0);
@@ -861,8 +861,8 @@ public void testSubscriptionChangesWithAutoCommitEnabled() {
 
         // verify that the fetch occurred as expected
         assertEquals(11, records.count());
-        assertEquals(1L, consumer.position(tp0));
-        assertEquals(10L, consumer.position(t2p0));
+        assertEquals(1L, consumer.position(tp0, Long.MAX_VALUE));
+        assertEquals(10L, consumer.position(t2p0, Long.MAX_VALUE));
 
         // subscription change
         consumer.subscribe(Arrays.asList(topic, topic3), 
getConsumerRebalanceListener(consumer));
@@ -892,8 +892,8 @@ public void testSubscriptionChangesWithAutoCommitEnabled() {
 
         // verify that the fetch occurred as expected
         assertEquals(101, records.count());
-        assertEquals(2L, consumer.position(tp0));
-        assertEquals(100L, consumer.position(t3p0));
+        assertEquals(2L, consumer.position(tp0, Long.MAX_VALUE));
+        assertEquals(100L, consumer.position(t3p0, Long.MAX_VALUE));
 
         // verify that the offset commits occurred as expected
         assertTrue(commitReceived.get());
@@ -1037,7 +1037,7 @@ public void 
testManualAssignmentChangeWithAutoCommitEnabled() {
 
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(1, records.count());
-        assertEquals(11L, consumer.position(tp0));
+        assertEquals(11L, consumer.position(tp0, Long.MAX_VALUE));
 
         // mock the offset commit response for to be revoked partitions
         AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, 
coordinator, tp0, 11);
@@ -1102,7 +1102,7 @@ public void 
testManualAssignmentChangeWithAutoCommitDisabled() {
 
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(1, records.count());
-        assertEquals(11L, consumer.position(tp0));
+        assertEquals(11L, consumer.position(tp0, Long.MAX_VALUE));
 
         // new manual assignment
         consumer.assign(Arrays.asList(t2p0));
@@ -1170,8 +1170,8 @@ public void testOffsetOfPausedPartitions() {
         offsetResponse.put(tp0, 3L);
         offsetResponse.put(tp1, 3L);
         client.prepareResponse(listOffsetsResponse(offsetResponse, 
Errors.NONE));
-        assertEquals(3L, consumer.position(tp0));
-        assertEquals(3L, consumer.position(tp1));
+        assertEquals(3L, consumer.position(tp0, Long.MAX_VALUE));
+        assertEquals(3L, consumer.position(tp1, Long.MAX_VALUE));
 
         client.requests().clear();
         consumer.unsubscribe();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 6ed46f711cb..ba6422f782e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -52,7 +52,7 @@ public void testSimpleMock() {
         assertEquals(rec1, iter.next());
         assertEquals(rec2, iter.next());
         assertFalse(iter.hasNext());
-        assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
+        assertEquals(2L, consumer.position(new TopicPartition("test", 0), 
Long.MAX_VALUE));
         consumer.commitSync();
         assertEquals(2L, consumer.committed(new TopicPartition("test", 
0)).offset());
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index c4567a3693b..3d50fd0da48 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -492,7 +492,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             lastCommittedOffsets = new HashMap<>();
             currentOffsets = new HashMap<>();
             for (TopicPartition tp : partitions) {
-                long pos = consumer.position(tp);
+                long pos = consumer.position(tp, Long.MAX_VALUE);
                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
                 currentOffsets.put(tp, new OffsetAndMetadata(pos));
                 log.debug("{} assigned topic partition {} with offset {}", id, 
tp, pos);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 0e190bc3767..c323360375b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -275,7 +275,7 @@ private void readToLogEnd() {
             Iterator<Map.Entry<TopicPartition, Long>> it = 
endOffsets.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<TopicPartition, Long> entry = it.next();
-                if (consumer.position(entry.getKey()) >= entry.getValue())
+                if (consumer.position(entry.getKey(), Long.MAX_VALUE) >= 
entry.getValue())
                     it.remove();
                 else {
                     poll(Integer.MAX_VALUE);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index eae3726aea7..d1943e8b7c3 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -340,8 +340,8 @@ public void testWakeupInCommitSyncCausesRetry() throws 
Exception {
         sinkTask.close(new HashSet<>(partitions));
         EasyMock.expectLastCall();
 
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
 
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
@@ -722,8 +722,8 @@ private void 
expectRebalanceAssignmentError(RuntimeException e) {
         sinkTask.preCommit(EasyMock.<Map<TopicPartition, 
OffsetAndMetadata>>anyObject());
         EasyMock.expectLastCall().andReturn(Collections.emptyMap());
 
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
 
         sinkTask.open(partitions);
         EasyMock.expectLastCall().andThrow(e);
@@ -752,8 +752,8 @@ private void expectPollInitialAssignment() {
                 return ConsumerRecords.empty();
             }
         });
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
 
         sinkTask.put(Collections.<SinkRecord>emptyList());
         EasyMock.expectLastCall();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 6f77f650c8d..e718806e7db 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -516,9 +516,9 @@ private void expectPollInitialAssignment() throws Exception 
{
                 return ConsumerRecords.empty();
             }
         });
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION3, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
 
         sinkTask.put(Collections.<SinkRecord>emptyList());
         EasyMock.expectLastCall();
@@ -630,9 +630,9 @@ public SinkRecord answer() {
                     }
                 });
 
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION3, 
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
 
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index b2c164dc1c9..37029f86173 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -247,8 +247,8 @@ public void run() {
         store.start();
 
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-        assertEquals(7L, consumer.position(TP0));
-        assertEquals(7L, consumer.position(TP1));
+        assertEquals(7L, consumer.position(TP0, Long.MAX_VALUE));
+        assertEquals(7L, consumer.position(TP1, Long.MAX_VALUE));
 
         store.stop();
 
@@ -283,8 +283,8 @@ public void testSendAndReadToEnd() throws Exception {
         consumer.updateEndOffsets(endOffsets);
         store.start();
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-        assertEquals(0L, consumer.position(TP0));
-        assertEquals(0L, consumer.position(TP1));
+        assertEquals(0L, consumer.position(TP0, Long.MAX_VALUE));
+        assertEquals(0L, consumer.position(TP1, Long.MAX_VALUE));
 
         // Set some keys
         final AtomicInteger invoked = new AtomicInteger(0);
@@ -412,7 +412,7 @@ public void run() {
         store.start();
         assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-        assertEquals(1L, consumer.position(TP0));
+        assertEquals(1L, consumer.position(TP0, Long.MAX_VALUE));
 
         store.stop();
 
@@ -439,8 +439,8 @@ public void testProducerError() throws Exception {
         consumer.updateEndOffsets(endOffsets);
         store.start();
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-        assertEquals(0L, consumer.position(TP0));
-        assertEquals(0L, consumer.position(TP1));
+        assertEquals(0L, consumer.position(TP0, Long.MAX_VALUE));
+        assertEquals(0L, consumer.position(TP1, Long.MAX_VALUE));
 
         final AtomicReference<Throwable> setException = new 
AtomicReference<>();
         store.send(TP0_KEY, TP0_VALUE, new 
org.apache.kafka.clients.producer.Callback() {
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 55989f46e6d..1e35f86b0bd 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -244,7 +244,7 @@ private void 
maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
 
             if (!dryRun) {
                 for (final TopicPartition p : partitions) {
-                    client.position(p);
+                    client.position(p, Long.MAX_VALUE);
                 }
                 client.commitSync();
             }
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1a4de997bc7..c8122ef41bf 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -784,21 +784,21 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test(expected = classOf[AuthorizationException])
   def testOffsetFetchWithNoAccess() {
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, Long.MaxValue)
   }
 
   @Test(expected = classOf[GroupAuthorizationException])
   def testOffsetFetchWithNoGroupAccess() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, Long.MaxValue)
   }
 
   @Test(expected = classOf[KafkaException])
   def testOffsetFetchWithNoTopicAccess() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, Long.MaxValue)
   }
 
   @Test
@@ -834,7 +834,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, Long.MaxValue)
   }
 
   @Test
@@ -842,7 +842,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), groupResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Read)), topicResource)
     this.consumers.head.assign(List(tp).asJava)
-    this.consumers.head.position(tp)
+    this.consumers.head.position(tp, Long.MaxValue)
   }
 
   @Test
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 27cafd70614..db1f84ab3d3 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -115,9 +115,9 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
 
       if (records.nonEmpty) {
         consumer.commitSync()
-        assertEquals(consumer.position(tp), consumer.committed(tp).offset)
+        assertEquals(consumer.position(tp, Long.MaxValue), 
consumer.committed(tp).offset)
 
-        if (consumer.position(tp) == numRecords) {
+        if (consumer.position(tp, Long.MaxValue) == numRecords) {
           consumer.seekToBeginning(Collections.emptyList())
           consumed = 0
         }
@@ -151,16 +151,16 @@ class ConsumerBounceTest extends IntegrationTestHarness 
with Logging {
       if (coin == 0) {
         info("Seeking to end of log")
         consumer.seekToEnd(Collections.emptyList())
-        assertEquals(numRecords.toLong, consumer.position(tp))
+        assertEquals(numRecords.toLong, consumer.position(tp, Long.MaxValue))
       } else if (coin == 1) {
         val pos = TestUtils.random.nextInt(numRecords).toLong
         info("Seeking to " + pos)
         consumer.seek(tp, pos)
-        assertEquals(pos, consumer.position(tp))
+        assertEquals(pos, consumer.position(tp, Long.MaxValue))
       } else if (coin == 2) {
         info("Committing offset.")
         consumer.commitSync()
-        assertEquals(consumer.position(tp), consumer.committed(tp).offset)
+        assertEquals(consumer.position(tp, Long.MaxValue), 
consumer.committed(tp).offset)
       }
     }
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala 
b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index c9c40a9e58f..ff282e850d8 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -85,15 +85,15 @@ class LegacyAdminClientTest extends IntegrationTestHarness 
with Logging {
 
     sendRecords(producers.head, 10, tp)
     consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(0L, consumer.position(tp))
+    assertEquals(0L, consumer.position(tp, Long.MaxValue))
 
     client.deleteRecordsBefore(Map((tp, 5L))).get()
     consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(5L, consumer.position(tp))
+    assertEquals(5L, consumer.position(tp, Long.MaxValue))
 
     client.deleteRecordsBefore(Map((tp, 
DeleteRecordsRequest.HIGH_WATERMARK))).get()
     consumer.seekToBeginning(Collections.singletonList(tp))
-    assertEquals(10L, consumer.position(tp))
+    assertEquals(10L, consumer.position(tp, Long.MaxValue))
   }
 
   @Test
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index aad9b6ad24a..ffe36a789fc 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -190,7 +190,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
           // than session timeout and then try a commit. We should still be in 
the group,
           // so the commit should succeed
           Utils.sleep(1500)
-          committedPosition = consumer0.position(tp)
+          committedPosition = consumer0.position(tp, Long.MaxValue)
           consumer0.commitSync(Map(tp -> new 
OffsetAndMetadata(committedPosition)).asJava)
           commitCompleted = true
         }
@@ -583,15 +583,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.assign(List(tp).asJava)
 
     consumer.seekToEnd(List(tp).asJava)
-    assertEquals(totalRecords, consumer.position(tp))
+    assertEquals(totalRecords, consumer.position(tp, Long.MaxValue))
     assertFalse(consumer.poll(totalRecords).iterator().hasNext)
 
     consumer.seekToBeginning(List(tp).asJava)
-    assertEquals(0, consumer.position(tp), 0)
+    assertEquals(0, consumer.position(tp, Long.MaxValue), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
 
     consumer.seek(tp, mid)
-    assertEquals(mid, consumer.position(tp))
+    assertEquals(mid, consumer.position(tp, Long.MaxValue))
 
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt,
       startingTimestamp = mid.toLong)
@@ -601,15 +601,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.assign(List(tp2).asJava)
 
     consumer.seekToEnd(List(tp2).asJava)
-    assertEquals(totalRecords, consumer.position(tp2))
+    assertEquals(totalRecords, consumer.position(tp2, Long.MaxValue))
     assertFalse(consumer.poll(totalRecords).iterator().hasNext)
 
     consumer.seekToBeginning(List(tp2).asJava)
-    assertEquals(0, consumer.position(tp2), 0)
+    assertEquals(0, consumer.position(tp2, Long.MaxValue), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = 
tp2)
 
     consumer.seek(tp2, mid)
-    assertEquals(mid, consumer.position(tp2))
+    assertEquals(mid, consumer.position(tp2, Long.MaxValue))
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt,
       startingTimestamp = mid.toLong, tp = tp2)
   }
@@ -634,17 +634,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // position() on a partition that we aren't subscribed to throws an 
exception
     intercept[IllegalArgumentException] {
-      this.consumers.head.position(new TopicPartition(topic, 15))
+      this.consumers.head.position(new TopicPartition(topic, 15), 
Long.MaxValue)
     }
 
     this.consumers.head.assign(List(tp).asJava)
 
-    assertEquals("position() on a partition that we are subscribed to should 
reset the offset", 0L, this.consumers.head.position(tp))
+    assertEquals("position() on a partition that we are subscribed to should 
reset the offset", 0L, this.consumers.head.position(tp, Long.MaxValue))
     this.consumers.head.commitSync()
     assertEquals(0L, this.consumers.head.committed(tp).offset)
 
     consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5, 
startingOffset = 0)
-    assertEquals("After consuming 5 records, position should be 5", 5L, 
this.consumers.head.position(tp))
+    assertEquals("After consuming 5 records, position should be 5", 5L, 
this.consumers.head.position(tp, Long.MaxValue))
     this.consumers.head.commitSync()
     assertEquals("Committed offset should be returned", 5L, 
this.consumers.head.committed(tp).offset)
 
@@ -1309,15 +1309,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // Need to poll to join the group
     this.consumers.head.poll(50)
-    val pos1 = this.consumers.head.position(tp)
-    val pos2 = this.consumers.head.position(tp2)
+    val pos1 = this.consumers.head.position(tp, Long.MaxValue)
+    val pos2 = this.consumers.head.position(tp2, Long.MaxValue)
     this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, 
new OffsetAndMetadata(3L))).asJava)
     assertEquals(3, this.consumers.head.committed(tp).offset)
     assertNull(this.consumers.head.committed(tp2))
 
     // Positions should not change
-    assertEquals(pos1, this.consumers.head.position(tp))
-    assertEquals(pos2, this.consumers.head.position(tp2))
+    assertEquals(pos1, this.consumers.head.position(tp, Long.MaxValue))
+    assertEquals(pos2, this.consumers.head.position(tp2, Long.MaxValue))
     this.consumers.head.commitSync(Map[TopicPartition, 
OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
     assertEquals(3, this.consumers.head.committed(tp).offset)
     assertEquals(5, this.consumers.head.committed(tp2).offset)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 760cc39a974..fbd7025da74 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -168,7 +168,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     assertEquals(2, readCommittedConsumer.assignment.size)
     readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment)
     readCommittedConsumer.assignment.asScala.foreach { tp =>
-      assertEquals(1L, readCommittedConsumer.position(tp))
+      assertEquals(1L, readCommittedConsumer.position(tp, Long.MaxValue))
     }
 
     // undecided timestamps should not be searchable either
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ad641c01fdb..60d10e6468b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1436,7 +1436,7 @@ object TestUtils extends Logging {
   def consumerPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) : 
Map[TopicPartition, OffsetAndMetadata]  = {
     val offsetsToCommit = new mutable.HashMap[TopicPartition, 
OffsetAndMetadata]()
     consumer.assignment.foreach{ topicPartition =>
-      offsetsToCommit.put(topicPartition, new 
OffsetAndMetadata(consumer.position(topicPartition)))
+      offsetsToCommit.put(topicPartition, new 
OffsetAndMetadata(consumer.position(topicPartition, Long.MaxValue)))
     }
     offsetsToCommit.toMap
   }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index d9205a0c425..479930a5f04 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -169,7 +169,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
                 
consumer.seekToBeginning(Collections.singletonList(topicPartition));
             }
 
-            long offset = consumer.position(topicPartition);
+            long offset = consumer.position(topicPartition, Long.MAX_VALUE);
             final Long highWatermark = highWatermarks.get(topicPartition);
             BatchingStateRestoreCallback
                 stateRestoreAdapter =
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 842721db533..84a650027a3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -137,7 +137,7 @@ public void restore() {
                     logRestoreOffsets(restorer.partition(),
                                       restorer.checkpoint(),
                                       endOffsets.get(restorer.partition()));
-                    
restorer.setStartingOffset(consumer.position(restorer.partition()));
+                    
restorer.setStartingOffset(consumer.position(restorer.partition(), 
Long.MAX_VALUE));
                 } else {
                     
consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
                     needsPositionUpdate.add(restorer);
@@ -145,7 +145,7 @@ public void restore() {
             }
 
             for (final StateRestorer restorer : needsPositionUpdate) {
-                final long position = consumer.position(restorer.partition());
+                final long position = consumer.position(restorer.partition(), 
Long.MAX_VALUE);
                 restorer.setStartingOffset(position);
                 logRestoreOffsets(restorer.partition(),
                                   position,
@@ -234,7 +234,7 @@ private long processNext(final List<ConsumerRecord<byte[], 
byte[]>> records,
         }
 
         if (nextPosition == -1) {
-            nextPosition = consumer.position(restorer.partition());
+            nextPosition = consumer.position(restorer.partition(), 
Long.MAX_VALUE);
         }
 
         if (!restoreRecords.isEmpty()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 8ab6052014a..d173c4995f9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -61,8 +61,8 @@ public void shouldAssignPartitionsToConsumer() throws 
Exception {
     @Test
     public void shouldSeekToInitialOffsets() throws Exception {
         stateConsumer.initialize();
-        assertEquals(20L, consumer.position(topicOne));
-        assertEquals(30L, consumer.position(topicTwo));
+        assertEquals(20L, consumer.position(topicOne, Long.MAX_VALUE));
+        assertEquals(30L, consumer.position(topicTwo, Long.MAX_VALUE));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index af047a79ccf..0ddf302f1d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -228,7 +228,7 @@ private static void ensureStreamsApplicationDown(final 
String kafka) {
             }
 
             for (final TopicPartition tp : partitions) {
-                final long offset = consumer.position(tp);
+                final long offset = consumer.position(tp, Long.MAX_VALUE);
                 committedOffsets.put(tp, offset);
             }
         }
@@ -257,7 +257,7 @@ private static void ensureStreamsApplicationDown(final 
String kafka) {
                     throw new RuntimeException("FAIL: did receive more records 
than expected for " + tp
                         + " (expected EOL offset: " + readEndOffset + "; 
current offset: " + record.offset());
                 }
-                if (consumer.position(tp) >= readEndOffset) {
+                if (consumer.position(tp, Long.MAX_VALUE) >= readEndOffset) {
                     consumer.pause(Collections.singletonList(tp));
                 }
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java 
b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index ce889170bbf..67ce7b75c35 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -105,7 +105,7 @@ public synchronized void assign(Collection<TopicPartition> 
partitions) {
     }
 
     @Override
-    public synchronized long position(TopicPartition partition) {
+    public synchronized long position(TopicPartition partition, long timeout) {
         if (!partition.equals(assignedPartition))
             throw new IllegalStateException("RestoreConsumer: unassigned 
partition");
 
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 4c21e5448a6..d0e33160312 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -420,7 +420,7 @@ public synchronized void seekToEnd(final 
Collection<TopicPartition> partitions)
             public synchronized void seekToBeginning(final 
Collection<TopicPartition> partitions) {}
 
             @Override
-            public synchronized long position(final TopicPartition partition) {
+            public synchronized long position(final TopicPartition partition, 
final long timeout) {
                 return 0L;
             }
         };
@@ -447,7 +447,7 @@ public synchronized void seekToEnd(final 
Collection<TopicPartition> partitions)
             public synchronized void seekToBeginning(final 
Collection<TopicPartition> partitions) {}
 
             @Override
-            public synchronized long position(final TopicPartition partition) {
+            public synchronized long position(final TopicPartition partition, 
final long timeout) {
                 return 0L;
             }
         };
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java 
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 3903a3a8fae..de6b967b8a4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -183,7 +183,7 @@ private static ArgumentParser argParser() {
     private static Map<TopicPartition, OffsetAndMetadata> 
consumerPositions(KafkaConsumer<String, String> consumer) {
         Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>();
         for (TopicPartition topicPartition : consumer.assignment()) {
-            positions.put(topicPartition, new 
OffsetAndMetadata(consumer.position(topicPartition), null));
+            positions.put(topicPartition, new 
OffsetAndMetadata(consumer.position(topicPartition, Long.MAX_VALUE), null));
         }
         return positions;
     }
@@ -199,7 +199,7 @@ private static void 
resetToLastCommittedPositions(KafkaConsumer<String, String>
     }
 
     private static long messagesRemaining(KafkaConsumer<String, String> 
consumer, TopicPartition partition) {
-        long currentPosition = consumer.position(partition);
+        long currentPosition = consumer.position(partition, Long.MAX_VALUE);
         Map<TopicPartition, Long> endOffsets = 
consumer.endOffsets(singleton(partition));
         if (endOffsets.containsKey(partition)) {
             return endOffsets.get(partition) - currentPosition;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.position may hang forever when deleting a topic
> -------------------------------------------------------------
>
>                 Key: KAFKA-4879
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4879
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.2.0
>            Reporter: Shixiong Zhu
>            Assignee: Jason Gustafson
>             Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
>     // Make sure "delete.topic.enable" is set to true.
>     // Please create the topic test with "3" partitions manually.
>     // The issue is gone when there is only one partition.
>     String topic = "test";
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "localhost:9092");
>     props.put("group.id", "testgroup");
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     props.put("key.deserializer", StringDeserializer.class.getName());
>     props.put("enable.auto.commit", "false");
>     KafkaConsumer kc = new KafkaConsumer(props);
>     kc.subscribe(Collections.singletonList(topic));
>     kc.poll(0);
>     Set<TopicPartition> partitions = kc.assignment();
>     System.out.println("partitions: " + partitions);
>     kc.pause(partitions);
>     kc.seekToEnd(partitions);
>     System.out.println("please delete the topic in 30 seconds");
>     try {
>       // Sleep 30 seconds to give us enough time to delete the topic.
>       Thread.sleep(30000);
>     } catch (InterruptedException e) {
>       e.printStackTrace();
>     }
>     System.out.println("sleep end");
>     for (TopicPartition p : partitions) {
>       System.out.println(p + " offset: " + kc.position(p));
>     }
>     System.out.println("cannot reach here");
>     kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to