This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c238af2  MINOR: Remove throwing exception if not found from describe 
topics (#6112)
c238af2 is described below

commit c238af29bf50cade5aa10c1bb2678ad01cfbbf47
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Jan 10 13:03:11 2019 -0500

    MINOR: Remove throwing exception if not found from describe topics (#6112)
    
    We recently improved the handling of the InternalTopicManager retries with 
#6085. The AdminClient will throw an InvalidTopicException if the topic is not 
found. We need to ignore that exception as when calling AdminClient#describe we 
may not have had a chance to create the topic yet, especially with the case of 
internal topics
    
    I've created a new test asserting that when an InvalidTopicException is 
thrown when the topic is not found we continue on.
    
    Reviewers: John Roesler <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../processor/internals/InternalTopicManager.java  |  7 +++--
 .../internals/InternalTopicManagerTest.java        | 31 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 40c25d1..ee7fd3e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -203,9 +204,11 @@ public class InternalTopicManager {
             } catch (final ExecutionException couldNotDescribeTopicException) {
                 final Throwable cause = 
couldNotDescribeTopicException.getCause();
                 if (cause instanceof UnknownTopicOrPartitionException ||
-                    cause instanceof LeaderNotAvailableException) {
+                    cause instanceof LeaderNotAvailableException ||
+                    (cause instanceof InvalidTopicException &&
+                        cause.getMessage().equals("Topic " + topicName + " not 
found."))) {
                     // This topic didn't exist or leader is not known yet, 
proceed to try to create it
-                    log.debug("Topic {} is unknown, hence not existed yet.", 
topicName);
+                    log.debug("Topic {} is unknown or not found, hence not 
existed yet.", topicName);
                 } else {
                     log.error("Unexpected error during topic description for 
{}.\n" +
                         "Error message was: {}", topicName, cause.toString());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index e91bf32..074228a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -193,6 +194,36 @@ public class InternalTopicManagerTest {
     }
 
     @Test
+    public void shouldLogWhenTopicNotFoundAndNotThrowException() {
+        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
+        final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister();
+        mockAdminClient.addTopic(
+            false,
+            topic,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, 
cluster, Collections.emptyList())),
+            null);
+
+        final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final InternalTopicConfig internalTopicConfigII = new 
RepartitionTopicConfig("internal-topic", Collections.emptyMap());
+        internalTopicConfigII.setNumberOfPartitions(1);
+
+        final Map<String, InternalTopicConfig> topicConfigMap = new 
HashMap<>();
+        topicConfigMap.put(topic, internalTopicConfig);
+        topicConfigMap.put("internal-topic", internalTopicConfigII);
+
+
+        internalTopicManager.makeReady(topicConfigMap);
+        boolean foundExpectedMessage = false;
+        for (final String message : appender.getMessages()) {
+            foundExpectedMessage |= message.contains("Topic internal-topic is 
unknown or not found, hence not existed yet.");
+        }
+        assertTrue(foundExpectedMessage);
+
+    }
+
+    @Test
     public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
         mockAdminClient.addTopic(
             false,

Reply via email to