[
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301019#comment-16301019
]
ASF GitHub Bot commented on KAFKA-4879:
---------------------------------------
hachikuji closed pull request #3637: KAFKA-4879 KafkaConsumer.position may hang
forever when deleting a topic
URL: https://github.com/apache/kafka/pull/3637
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index b1badefd318..992b0711670 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -116,9 +116,9 @@
public void seekToEnd(Collection<TopicPartition> partitions);
/**
- * @see KafkaConsumer#position(TopicPartition)
+ * @see KafkaConsumer#position(TopicPartition, long)
*/
- public long position(TopicPartition partition);
+ public long position(TopicPartition partition, long timeout);
/**
* @see KafkaConsumer#committed(TopicPartition)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3154061bf01..5e287f3b114 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1079,8 +1079,9 @@ public void assign(Collection<TopicPartition> partitions)
{
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
+ //TODO we need to clarify this
if (!subscriptions.hasAllFetchPositions())
- updateFetchPositions(this.subscriptions.missingFetchPositions());
+ updateFetchPositions(this.subscriptions.missingFetchPositions(),
timeout);
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
fetcher.fetchedRecords();
@@ -1295,6 +1296,7 @@ public void seekToEnd(Collection<TopicPartition>
partitions) {
* Get the offset of the <i>next record</i> that will be fetched (if a
record with that offset exists).
*
* @param partition The partition to get the position for
+ * @param timeout The amount of time to wait for the offset
* @return The offset
* @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no
offset is currently defined for
* the partition
@@ -1305,8 +1307,10 @@ public void seekToEnd(Collection<TopicPartition>
partitions) {
* @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic or to the
* configured groupId
* @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors
+ *
+ * @throws org.apache.kafka.common.errors.TimeoutException: if the given
partition is not available
*/
- public long position(TopicPartition partition) {
+ public long position(TopicPartition partition, long timeout) {
acquireAndEnsureOpen();
try {
if (!this.subscriptions.isAssigned(partition))
@@ -1314,7 +1318,7 @@ public long position(TopicPartition partition) {
Long offset = this.subscriptions.position(partition);
if (offset == null) {
// batch update fetch positions for any partitions without a
valid position
- updateFetchPositions(subscriptions.assignedPartitions());
+ updateFetchPositions(subscriptions.assignedPartitions(),
timeout);
offset = this.subscriptions.position(partition);
}
return offset;
@@ -1645,15 +1649,18 @@ private void close(long timeoutMs, boolean
swallowException) {
* or reset it using the offset reset policy the user has configured.
*
* @param partitions The partitions that needs updating fetch positions
+ * @param timeout The amount of time to wait for the offset
* @throws NoOffsetForPartitionException If no offset is stored for a
given partition and no offset reset policy is
* defined
+ *
+ * @throws org.apache.kafka.common.errors.TimeoutException: if one of the
given partition is not available
*/
- private void updateFetchPositions(Set<TopicPartition> partitions) {
+ private void updateFetchPositions(Set<TopicPartition> partitions, long
timeout) {
// lookup any positions for partitions which are awaiting reset (which
may be the
// case if the user called seekToBeginning or seekToEnd. We do this
check first to
// avoid an unnecessary lookup of committed offsets (which typically
occurs when
// the user is manually assigning partitions and managing their own
offsets).
- fetcher.resetOffsetsIfNeeded(partitions);
+ fetcher.resetOffsetsIfNeeded(partitions, timeout);
if (!subscriptions.hasAllFetchPositions(partitions)) {
// if we still don't have offsets for the given partitions, then
we should either
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 91cb6f1ce7a..303ca62de51 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -241,7 +241,7 @@ public synchronized OffsetAndMetadata
committed(TopicPartition partition) {
}
@Override
- public synchronized long position(TopicPartition partition) {
+ public synchronized long position(TopicPartition partition, long timeout) {
ensureNotClosed();
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the
position for partitions assigned to this consumer.");
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 23c59020a57..61743d0344c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -243,15 +243,18 @@ public void onFailure(RuntimeException e) {
/**
* Lookup and set offsets for any partitions which are awaiting an
explicit reset.
* @param partitions the partitions to reset
+ * @param timeout The amount of time to wait for the offset
+ *
+ * @throws org.apache.kafka.common.errors.TimeoutException: if one of the
given partition is not available
*/
- public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
+ public void resetOffsetsIfNeeded(Set<TopicPartition> partitions, long
timeout) {
final Set<TopicPartition> needsOffsetReset = new HashSet<>();
for (TopicPartition tp : partitions) {
if (subscriptions.isAssigned(tp) &&
subscriptions.isOffsetResetNeeded(tp))
needsOffsetReset.add(tp);
}
if (!needsOffsetReset.isEmpty()) {
- resetOffsets(needsOffsetReset);
+ resetOffsets(needsOffsetReset, timeout);
}
}
@@ -281,7 +284,7 @@ public void updateFetchPositions(Set<TopicPartition>
partitions) {
}
if (!needsOffsetReset.isEmpty()) {
- resetOffsets(needsOffsetReset);
+ resetOffsets(needsOffsetReset, Long.MAX_VALUE);
}
}
@@ -400,15 +403,17 @@ else if (strategy == OffsetResetStrategy.LATEST)
* Reset offsets for the given partition using the offset reset strategy.
*
* @param partitions The partitions that need offsets reset
+ * @param timeout The amount of time to wait for the offset
* @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException
If no offset reset strategy is defined
+ * @throws org.apache.kafka.common.errors.TimeoutException: if one of the
given partition is not available
*/
- private void resetOffsets(final Set<TopicPartition> partitions) {
+ private void resetOffsets(final Set<TopicPartition> partitions, long
timeout) {
final Map<TopicPartition, Long> offsetResets = new HashMap<>();
final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
for (final TopicPartition partition : partitions) {
offsetResetStrategyTimestamp(partition, offsetResets,
partitionsWithNoOffsets);
}
- final Map<TopicPartition, OffsetData> offsetsByTimes =
retrieveOffsetsByTimes(offsetResets, Long.MAX_VALUE, false);
+ final Map<TopicPartition, OffsetData> offsetsByTimes =
retrieveOffsetsByTimes(offsetResets, timeout, false);
for (final TopicPartition partition : partitions) {
final OffsetData offsetData = offsetsByTimes.get(partition);
if (offsetData == null) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9fd7e19810e..42d0c5a2944 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -470,7 +470,7 @@ public void
verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(5, records.count());
- assertEquals(55L, consumer.position(tp0));
+ assertEquals(55L, consumer.position(tp0, Long.MAX_VALUE));
consumer.close(0, TimeUnit.MILLISECONDS);
}
@@ -697,7 +697,7 @@ public void testWakeupWithFetchDataAvailable() throws
Exception {
}
// make sure the position hasn't been updated
- assertEquals(0, consumer.position(tp0));
+ assertEquals(0, consumer.position(tp0, Long.MAX_VALUE));
// the next poll should return the completed fetch
ConsumerRecords<String, String> records = consumer.poll(0);
@@ -861,8 +861,8 @@ public void testSubscriptionChangesWithAutoCommitEnabled() {
// verify that the fetch occurred as expected
assertEquals(11, records.count());
- assertEquals(1L, consumer.position(tp0));
- assertEquals(10L, consumer.position(t2p0));
+ assertEquals(1L, consumer.position(tp0, Long.MAX_VALUE));
+ assertEquals(10L, consumer.position(t2p0, Long.MAX_VALUE));
// subscription change
consumer.subscribe(Arrays.asList(topic, topic3),
getConsumerRebalanceListener(consumer));
@@ -892,8 +892,8 @@ public void testSubscriptionChangesWithAutoCommitEnabled() {
// verify that the fetch occurred as expected
assertEquals(101, records.count());
- assertEquals(2L, consumer.position(tp0));
- assertEquals(100L, consumer.position(t3p0));
+ assertEquals(2L, consumer.position(tp0, Long.MAX_VALUE));
+ assertEquals(100L, consumer.position(t3p0, Long.MAX_VALUE));
// verify that the offset commits occurred as expected
assertTrue(commitReceived.get());
@@ -1037,7 +1037,7 @@ public void
testManualAssignmentChangeWithAutoCommitEnabled() {
ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(1, records.count());
- assertEquals(11L, consumer.position(tp0));
+ assertEquals(11L, consumer.position(tp0, Long.MAX_VALUE));
// mock the offset commit response for to be revoked partitions
AtomicBoolean commitReceived = prepareOffsetCommitResponse(client,
coordinator, tp0, 11);
@@ -1102,7 +1102,7 @@ public void
testManualAssignmentChangeWithAutoCommitDisabled() {
ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(1, records.count());
- assertEquals(11L, consumer.position(tp0));
+ assertEquals(11L, consumer.position(tp0, Long.MAX_VALUE));
// new manual assignment
consumer.assign(Arrays.asList(t2p0));
@@ -1170,8 +1170,8 @@ public void testOffsetOfPausedPartitions() {
offsetResponse.put(tp0, 3L);
offsetResponse.put(tp1, 3L);
client.prepareResponse(listOffsetsResponse(offsetResponse,
Errors.NONE));
- assertEquals(3L, consumer.position(tp0));
- assertEquals(3L, consumer.position(tp1));
+ assertEquals(3L, consumer.position(tp0, Long.MAX_VALUE));
+ assertEquals(3L, consumer.position(tp1, Long.MAX_VALUE));
client.requests().clear();
consumer.unsubscribe();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 6ed46f711cb..ba6422f782e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -52,7 +52,7 @@ public void testSimpleMock() {
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
- assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
+ assertEquals(2L, consumer.position(new TopicPartition("test", 0),
Long.MAX_VALUE));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test",
0)).offset());
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index c4567a3693b..3d50fd0da48 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -492,7 +492,7 @@ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
lastCommittedOffsets = new HashMap<>();
currentOffsets = new HashMap<>();
for (TopicPartition tp : partitions) {
- long pos = consumer.position(tp);
+ long pos = consumer.position(tp, Long.MAX_VALUE);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
currentOffsets.put(tp, new OffsetAndMetadata(pos));
log.debug("{} assigned topic partition {} with offset {}", id,
tp, pos);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 0e190bc3767..c323360375b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -275,7 +275,7 @@ private void readToLogEnd() {
Iterator<Map.Entry<TopicPartition, Long>> it =
endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
- if (consumer.position(entry.getKey()) >= entry.getValue())
+ if (consumer.position(entry.getKey(), Long.MAX_VALUE) >=
entry.getValue())
it.remove();
else {
poll(Integer.MAX_VALUE);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index eae3726aea7..d1943e8b7c3 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -340,8 +340,8 @@ public void testWakeupInCommitSyncCausesRetry() throws
Exception {
sinkTask.close(new HashSet<>(partitions));
EasyMock.expectLastCall();
-
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION2,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
sinkTask.open(partitions);
EasyMock.expectLastCall();
@@ -722,8 +722,8 @@ private void
expectRebalanceAssignmentError(RuntimeException e) {
sinkTask.preCommit(EasyMock.<Map<TopicPartition,
OffsetAndMetadata>>anyObject());
EasyMock.expectLastCall().andReturn(Collections.emptyMap());
-
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION2,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
sinkTask.open(partitions);
EasyMock.expectLastCall().andThrow(e);
@@ -752,8 +752,8 @@ private void expectPollInitialAssignment() {
return ConsumerRecords.empty();
}
});
-
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION2,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
sinkTask.put(Collections.<SinkRecord>emptyList());
EasyMock.expectLastCall();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 6f77f650c8d..e718806e7db 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -516,9 +516,9 @@ private void expectPollInitialAssignment() throws Exception
{
return ConsumerRecords.empty();
}
});
-
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION2,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION3,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
sinkTask.put(Collections.<SinkRecord>emptyList());
EasyMock.expectLastCall();
@@ -630,9 +630,9 @@ public SinkRecord answer() {
}
});
-
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION2,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION3,
Long.MAX_VALUE)).andReturn(FIRST_OFFSET);
sinkTask.open(partitions);
EasyMock.expectLastCall();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index b2c164dc1c9..37029f86173 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -247,8 +247,8 @@ public void run() {
store.start();
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
- assertEquals(7L, consumer.position(TP0));
- assertEquals(7L, consumer.position(TP1));
+ assertEquals(7L, consumer.position(TP0, Long.MAX_VALUE));
+ assertEquals(7L, consumer.position(TP1, Long.MAX_VALUE));
store.stop();
@@ -283,8 +283,8 @@ public void testSendAndReadToEnd() throws Exception {
consumer.updateEndOffsets(endOffsets);
store.start();
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
- assertEquals(0L, consumer.position(TP0));
- assertEquals(0L, consumer.position(TP1));
+ assertEquals(0L, consumer.position(TP0, Long.MAX_VALUE));
+ assertEquals(0L, consumer.position(TP1, Long.MAX_VALUE));
// Set some keys
final AtomicInteger invoked = new AtomicInteger(0);
@@ -412,7 +412,7 @@ public void run() {
store.start();
assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
- assertEquals(1L, consumer.position(TP0));
+ assertEquals(1L, consumer.position(TP0, Long.MAX_VALUE));
store.stop();
@@ -439,8 +439,8 @@ public void testProducerError() throws Exception {
consumer.updateEndOffsets(endOffsets);
store.start();
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
- assertEquals(0L, consumer.position(TP0));
- assertEquals(0L, consumer.position(TP1));
+ assertEquals(0L, consumer.position(TP0, Long.MAX_VALUE));
+ assertEquals(0L, consumer.position(TP1, Long.MAX_VALUE));
final AtomicReference<Throwable> setException = new
AtomicReference<>();
store.send(TP0_KEY, TP0_VALUE, new
org.apache.kafka.clients.producer.Callback() {
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 55989f46e6d..1e35f86b0bd 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -244,7 +244,7 @@ private void
maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
if (!dryRun) {
for (final TopicPartition p : partitions) {
- client.position(p);
+ client.position(p, Long.MAX_VALUE);
}
client.commitSync();
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1a4de997bc7..c8122ef41bf 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -784,21 +784,21 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
@Test(expected = classOf[AuthorizationException])
def testOffsetFetchWithNoAccess() {
this.consumers.head.assign(List(tp).asJava)
- this.consumers.head.position(tp)
+ this.consumers.head.position(tp, Long.MaxValue)
}
@Test(expected = classOf[GroupAuthorizationException])
def testOffsetFetchWithNoGroupAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
- this.consumers.head.position(tp)
+ this.consumers.head.position(tp, Long.MaxValue)
}
@Test(expected = classOf[KafkaException])
def testOffsetFetchWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)), groupResource)
this.consumers.head.assign(List(tp).asJava)
- this.consumers.head.position(tp)
+ this.consumers.head.position(tp, Long.MaxValue)
}
@Test
@@ -834,7 +834,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Describe)), topicResource)
this.consumers.head.assign(List(tp).asJava)
- this.consumers.head.position(tp)
+ this.consumers.head.position(tp, Long.MaxValue)
}
@Test
@@ -842,7 +842,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)), groupResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)), topicResource)
this.consumers.head.assign(List(tp).asJava)
- this.consumers.head.position(tp)
+ this.consumers.head.position(tp, Long.MaxValue)
}
@Test
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 27cafd70614..db1f84ab3d3 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -115,9 +115,9 @@ class ConsumerBounceTest extends IntegrationTestHarness
with Logging {
if (records.nonEmpty) {
consumer.commitSync()
- assertEquals(consumer.position(tp), consumer.committed(tp).offset)
+ assertEquals(consumer.position(tp, Long.MaxValue),
consumer.committed(tp).offset)
- if (consumer.position(tp) == numRecords) {
+ if (consumer.position(tp, Long.MaxValue) == numRecords) {
consumer.seekToBeginning(Collections.emptyList())
consumed = 0
}
@@ -151,16 +151,16 @@ class ConsumerBounceTest extends IntegrationTestHarness
with Logging {
if (coin == 0) {
info("Seeking to end of log")
consumer.seekToEnd(Collections.emptyList())
- assertEquals(numRecords.toLong, consumer.position(tp))
+ assertEquals(numRecords.toLong, consumer.position(tp, Long.MaxValue))
} else if (coin == 1) {
val pos = TestUtils.random.nextInt(numRecords).toLong
info("Seeking to " + pos)
consumer.seek(tp, pos)
- assertEquals(pos, consumer.position(tp))
+ assertEquals(pos, consumer.position(tp, Long.MaxValue))
} else if (coin == 2) {
info("Committing offset.")
consumer.commitSync()
- assertEquals(consumer.position(tp), consumer.committed(tp).offset)
+ assertEquals(consumer.position(tp, Long.MaxValue),
consumer.committed(tp).offset)
}
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
index c9c40a9e58f..ff282e850d8 100644
--- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala
@@ -85,15 +85,15 @@ class LegacyAdminClientTest extends IntegrationTestHarness
with Logging {
sendRecords(producers.head, 10, tp)
consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(0L, consumer.position(tp))
+ assertEquals(0L, consumer.position(tp, Long.MaxValue))
client.deleteRecordsBefore(Map((tp, 5L))).get()
consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(5L, consumer.position(tp))
+ assertEquals(5L, consumer.position(tp, Long.MaxValue))
client.deleteRecordsBefore(Map((tp,
DeleteRecordsRequest.HIGH_WATERMARK))).get()
consumer.seekToBeginning(Collections.singletonList(tp))
- assertEquals(10L, consumer.position(tp))
+ assertEquals(10L, consumer.position(tp, Long.MaxValue))
}
@Test
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index aad9b6ad24a..ffe36a789fc 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -190,7 +190,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// than session timeout and then try a commit. We should still be in
the group,
// so the commit should succeed
Utils.sleep(1500)
- committedPosition = consumer0.position(tp)
+ committedPosition = consumer0.position(tp, Long.MaxValue)
consumer0.commitSync(Map(tp -> new
OffsetAndMetadata(committedPosition)).asJava)
commitCompleted = true
}
@@ -583,15 +583,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.assign(List(tp).asJava)
consumer.seekToEnd(List(tp).asJava)
- assertEquals(totalRecords, consumer.position(tp))
+ assertEquals(totalRecords, consumer.position(tp, Long.MaxValue))
assertFalse(consumer.poll(totalRecords).iterator().hasNext)
consumer.seekToBeginning(List(tp).asJava)
- assertEquals(0, consumer.position(tp), 0)
+ assertEquals(0, consumer.position(tp, Long.MaxValue), 0)
consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
consumer.seek(tp, mid)
- assertEquals(mid, consumer.position(tp))
+ assertEquals(mid, consumer.position(tp, Long.MaxValue))
consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset =
mid.toInt, startingKeyAndValueIndex = mid.toInt,
startingTimestamp = mid.toLong)
@@ -601,15 +601,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.assign(List(tp2).asJava)
consumer.seekToEnd(List(tp2).asJava)
- assertEquals(totalRecords, consumer.position(tp2))
+ assertEquals(totalRecords, consumer.position(tp2, Long.MaxValue))
assertFalse(consumer.poll(totalRecords).iterator().hasNext)
consumer.seekToBeginning(List(tp2).asJava)
- assertEquals(0, consumer.position(tp2), 0)
+ assertEquals(0, consumer.position(tp2, Long.MaxValue), 0)
consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp =
tp2)
consumer.seek(tp2, mid)
- assertEquals(mid, consumer.position(tp2))
+ assertEquals(mid, consumer.position(tp2, Long.MaxValue))
consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset =
mid.toInt, startingKeyAndValueIndex = mid.toInt,
startingTimestamp = mid.toLong, tp = tp2)
}
@@ -634,17 +634,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// position() on a partition that we aren't subscribed to throws an
exception
intercept[IllegalArgumentException] {
- this.consumers.head.position(new TopicPartition(topic, 15))
+ this.consumers.head.position(new TopicPartition(topic, 15),
Long.MaxValue)
}
this.consumers.head.assign(List(tp).asJava)
- assertEquals("position() on a partition that we are subscribed to should
reset the offset", 0L, this.consumers.head.position(tp))
+ assertEquals("position() on a partition that we are subscribed to should
reset the offset", 0L, this.consumers.head.position(tp, Long.MaxValue))
this.consumers.head.commitSync()
assertEquals(0L, this.consumers.head.committed(tp).offset)
consumeAndVerifyRecords(consumer = this.consumers.head, numRecords = 5,
startingOffset = 0)
- assertEquals("After consuming 5 records, position should be 5", 5L,
this.consumers.head.position(tp))
+ assertEquals("After consuming 5 records, position should be 5", 5L,
this.consumers.head.position(tp, Long.MaxValue))
this.consumers.head.commitSync()
assertEquals("Committed offset should be returned", 5L,
this.consumers.head.committed(tp).offset)
@@ -1309,15 +1309,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// Need to poll to join the group
this.consumers.head.poll(50)
- val pos1 = this.consumers.head.position(tp)
- val pos2 = this.consumers.head.position(tp2)
+ val pos1 = this.consumers.head.position(tp, Long.MaxValue)
+ val pos2 = this.consumers.head.position(tp2, Long.MaxValue)
this.consumers.head.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp,
new OffsetAndMetadata(3L))).asJava)
assertEquals(3, this.consumers.head.committed(tp).offset)
assertNull(this.consumers.head.committed(tp2))
// Positions should not change
- assertEquals(pos1, this.consumers.head.position(tp))
- assertEquals(pos2, this.consumers.head.position(tp2))
+ assertEquals(pos1, this.consumers.head.position(tp, Long.MaxValue))
+ assertEquals(pos2, this.consumers.head.position(tp2, Long.MaxValue))
this.consumers.head.commitSync(Map[TopicPartition,
OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava)
assertEquals(3, this.consumers.head.committed(tp).offset)
assertEquals(5, this.consumers.head.committed(tp2).offset)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 760cc39a974..fbd7025da74 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -168,7 +168,7 @@ class TransactionsTest extends KafkaServerTestHarness {
assertEquals(2, readCommittedConsumer.assignment.size)
readCommittedConsumer.seekToEnd(readCommittedConsumer.assignment)
readCommittedConsumer.assignment.asScala.foreach { tp =>
- assertEquals(1L, readCommittedConsumer.position(tp))
+ assertEquals(1L, readCommittedConsumer.position(tp, Long.MaxValue))
}
// undecided timestamps should not be searchable either
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ad641c01fdb..60d10e6468b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1436,7 +1436,7 @@ object TestUtils extends Logging {
def consumerPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) :
Map[TopicPartition, OffsetAndMetadata] = {
val offsetsToCommit = new mutable.HashMap[TopicPartition,
OffsetAndMetadata]()
consumer.assignment.foreach{ topicPartition =>
- offsetsToCommit.put(topicPartition, new
OffsetAndMetadata(consumer.position(topicPartition)))
+ offsetsToCommit.put(topicPartition, new
OffsetAndMetadata(consumer.position(topicPartition, Long.MaxValue)))
}
offsetsToCommit.toMap
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index d9205a0c425..479930a5f04 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -169,7 +169,7 @@ private void restoreState(final StateRestoreCallback
stateRestoreCallback,
consumer.seekToBeginning(Collections.singletonList(topicPartition));
}
- long offset = consumer.position(topicPartition);
+ long offset = consumer.position(topicPartition, Long.MAX_VALUE);
final Long highWatermark = highWatermarks.get(topicPartition);
BatchingStateRestoreCallback
stateRestoreAdapter =
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 842721db533..84a650027a3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -137,7 +137,7 @@ public void restore() {
logRestoreOffsets(restorer.partition(),
restorer.checkpoint(),
endOffsets.get(restorer.partition()));
-
restorer.setStartingOffset(consumer.position(restorer.partition()));
+
restorer.setStartingOffset(consumer.position(restorer.partition(),
Long.MAX_VALUE));
} else {
consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
needsPositionUpdate.add(restorer);
@@ -145,7 +145,7 @@ public void restore() {
}
for (final StateRestorer restorer : needsPositionUpdate) {
- final long position = consumer.position(restorer.partition());
+ final long position = consumer.position(restorer.partition(),
Long.MAX_VALUE);
restorer.setStartingOffset(position);
logRestoreOffsets(restorer.partition(),
position,
@@ -234,7 +234,7 @@ private long processNext(final List<ConsumerRecord<byte[],
byte[]>> records,
}
if (nextPosition == -1) {
- nextPosition = consumer.position(restorer.partition());
+ nextPosition = consumer.position(restorer.partition(),
Long.MAX_VALUE);
}
if (!restoreRecords.isEmpty()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 8ab6052014a..d173c4995f9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -61,8 +61,8 @@ public void shouldAssignPartitionsToConsumer() throws
Exception {
@Test
public void shouldSeekToInitialOffsets() throws Exception {
stateConsumer.initialize();
- assertEquals(20L, consumer.position(topicOne));
- assertEquals(30L, consumer.position(topicTwo));
+ assertEquals(20L, consumer.position(topicOne, Long.MAX_VALUE));
+ assertEquals(30L, consumer.position(topicTwo, Long.MAX_VALUE));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index af047a79ccf..0ddf302f1d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -228,7 +228,7 @@ private static void ensureStreamsApplicationDown(final
String kafka) {
}
for (final TopicPartition tp : partitions) {
- final long offset = consumer.position(tp);
+ final long offset = consumer.position(tp, Long.MAX_VALUE);
committedOffsets.put(tp, offset);
}
}
@@ -257,7 +257,7 @@ private static void ensureStreamsApplicationDown(final
String kafka) {
throw new RuntimeException("FAIL: did receive more records
than expected for " + tp
+ " (expected EOL offset: " + readEndOffset + ";
current offset: " + record.offset());
}
- if (consumer.position(tp) >= readEndOffset) {
+ if (consumer.position(tp, Long.MAX_VALUE) >= readEndOffset) {
consumer.pause(Collections.singletonList(tp));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
index ce889170bbf..67ce7b75c35 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRestoreConsumer.java
@@ -105,7 +105,7 @@ public synchronized void assign(Collection<TopicPartition>
partitions) {
}
@Override
- public synchronized long position(TopicPartition partition) {
+ public synchronized long position(TopicPartition partition, long timeout) {
if (!partition.equals(assignedPartition))
throw new IllegalStateException("RestoreConsumer: unassigned
partition");
diff --git
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 4c21e5448a6..d0e33160312 100644
---
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -420,7 +420,7 @@ public synchronized void seekToEnd(final
Collection<TopicPartition> partitions)
public synchronized void seekToBeginning(final
Collection<TopicPartition> partitions) {}
@Override
- public synchronized long position(final TopicPartition partition) {
+ public synchronized long position(final TopicPartition partition,
final long timeout) {
return 0L;
}
};
@@ -447,7 +447,7 @@ public synchronized void seekToEnd(final
Collection<TopicPartition> partitions)
public synchronized void seekToBeginning(final
Collection<TopicPartition> partitions) {}
@Override
- public synchronized long position(final TopicPartition partition) {
+ public synchronized long position(final TopicPartition partition,
final long timeout) {
return 0L;
}
};
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 3903a3a8fae..de6b967b8a4 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -183,7 +183,7 @@ private static ArgumentParser argParser() {
private static Map<TopicPartition, OffsetAndMetadata>
consumerPositions(KafkaConsumer<String, String> consumer) {
Map<TopicPartition, OffsetAndMetadata> positions = new HashMap<>();
for (TopicPartition topicPartition : consumer.assignment()) {
- positions.put(topicPartition, new
OffsetAndMetadata(consumer.position(topicPartition), null));
+ positions.put(topicPartition, new
OffsetAndMetadata(consumer.position(topicPartition, Long.MAX_VALUE), null));
}
return positions;
}
@@ -199,7 +199,7 @@ private static void
resetToLastCommittedPositions(KafkaConsumer<String, String>
}
private static long messagesRemaining(KafkaConsumer<String, String>
consumer, TopicPartition partition) {
- long currentPosition = consumer.position(partition);
+ long currentPosition = consumer.position(partition, Long.MAX_VALUE);
Map<TopicPartition, Long> endOffsets =
consumer.endOffsets(singleton(partition));
if (endOffsets.containsKey(partition)) {
return endOffsets.get(partition) - currentPosition;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> KafkaConsumer.position may hang forever when deleting a topic
> -------------------------------------------------------------
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.10.2.0
> Reporter: Shixiong Zhu
> Assignee: Jason Gustafson
> Fix For: 1.1.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is
> this line
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
> public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set<TopicPartition> partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
> // Sleep 30 seconds to give us enough time to delete the topic.
> Thread.sleep(30000);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
> System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)