This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 04a70bd  KAFKA-6829: retry commits on unknown topic or partition 
(#4948)
04a70bd is described below

commit 04a70bd3fe40628462f63955c8522cae625feee3
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed May 2 20:01:28 2018 -0400

    KAFKA-6829: retry commits on unknown topic or partition (#4948)
    
    For the UNKNOWN_TOPIC_OR_PARTITION error, we could change the consumer's 
behavior to retry after this error. While this is a rare case since the user 
would not commit offsets for topics unless they had been able to fetch from 
them, but this doesn't really handle the situation where the broker hasn't 
received any metadata updates.
    
    Reviewers: Jason Gustafson <[email protected]>, John Roesler 
<[email protected]>, Guozhang Wang <[email protected]>
---
 .../kafka/clients/consumer/internals/ConsumerCoordinator.java  |  6 ++----
 .../clients/consumer/internals/ConsumerCoordinatorTest.java    | 10 ++++++----
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 3c99c96..eec070e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -757,7 +757,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                         // raise the error to the user
                         future.raise(error);
                         return;
-                    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+                    } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS
+                            || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                         // just retry
                         future.raise(error);
                         return;
@@ -774,9 +775,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                         resetGeneration();
                         future.raise(new CommitFailedException());
                         return;
-                    } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
-                        future.raise(new KafkaException("Topic or Partition " 
+ tp + " does not exist"));
-                        return;
                     } else {
                         future.raise(new KafkaException("Unexpected error in 
commit: " + error.message()));
                         return;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3e3c423..0304190 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1373,13 +1373,15 @@ public class ConsumerCoordinatorTest {
         assertEquals(Arrays.asList(firstOffset, secondOffset), 
committedOffsets);
     }
 
-    @Test(expected = KafkaException.class)
-    public void testCommitUnknownTopicOrPartition() {
+    @Test
+    public void testRetryCommitUnknownTopicOrPartition() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        prepareOffsetCommitRequest(singletonMap(t1p, 100L), 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
-        coordinator.commitOffsetsSync(singletonMap(t1p, new 
OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
+        client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+        client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.NONE)));
+
+        assertTrue(coordinator.commitOffsetsSync(singletonMap(t1p, new 
OffsetAndMetadata(100L, "metadata")), 10000));
     }
 
     @Test(expected = OffsetMetadataTooLarge.class)

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to