This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 04a70bd KAFKA-6829: retry commits on unknown topic or partition
(#4948)
04a70bd is described below
commit 04a70bd3fe40628462f63955c8522cae625feee3
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed May 2 20:01:28 2018 -0400
KAFKA-6829: retry commits on unknown topic or partition (#4948)
For the UNKNOWN_TOPIC_OR_PARTITION error, we could change the consumer's
behavior to retry after this error. While this is a rare case since the user
would not commit offsets for topics unless they had been able to fetch from
them, but this doesn't really handle the situation where the broker hasn't
received any metadata updates.
Reviewers: Jason Gustafson <[email protected]>, John Roesler
<[email protected]>, Guozhang Wang <[email protected]>
---
.../kafka/clients/consumer/internals/ConsumerCoordinator.java | 6 ++----
.../clients/consumer/internals/ConsumerCoordinatorTest.java | 10 ++++++----
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c96..eec070e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -757,7 +757,8 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// raise the error to the user
future.raise(error);
return;
- } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+ } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
+ || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
// just retry
future.raise(error);
return;
@@ -774,9 +775,6 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
resetGeneration();
future.raise(new CommitFailedException());
return;
- } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
- future.raise(new KafkaException("Topic or Partition "
+ tp + " does not exist"));
- return;
} else {
future.raise(new KafkaException("Unexpected error in
commit: " + error.message()));
return;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3e3c423..0304190 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1373,13 +1373,15 @@ public class ConsumerCoordinatorTest {
assertEquals(Arrays.asList(firstOffset, secondOffset),
committedOffsets);
}
- @Test(expected = KafkaException.class)
- public void testCommitUnknownTopicOrPartition() {
+ @Test
+ public void testRetryCommitUnknownTopicOrPartition() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady();
- prepareOffsetCommitRequest(singletonMap(t1p, 100L),
Errors.UNKNOWN_TOPIC_OR_PARTITION);
- coordinator.commitOffsetsSync(singletonMap(t1p, new
OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
+ client.prepareResponse(offsetCommitResponse(singletonMap(t1p,
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+ client.prepareResponse(offsetCommitResponse(singletonMap(t1p,
Errors.NONE)));
+
+ assertTrue(coordinator.commitOffsetsSync(singletonMap(t1p, new
OffsetAndMetadata(100L, "metadata")), 10000));
}
@Test(expected = OffsetMetadataTooLarge.class)
--
To stop receiving notification emails like this one, please contact
[email protected].