This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c84d29c2e4 KAFKA-14128: Kafka Streams does not handle 
TimeoutException (#13161)
8c84d29c2e4 is described below

commit 8c84d29c2e4f5f056a46e7453c4aa634358ee7bf
Author: Lucia Cerchie <luciacerc...@gmail.com>
AuthorDate: Wed Feb 22 23:51:51 2023 -0700

    KAFKA-14128: Kafka Streams does not handle TimeoutException (#13161)
    
    Kafka Streams is supposed to handle TimeoutException during internal topic 
creation gracefully. This PR fixes the exception handling code to avoid 
crashing on an TimeoutException returned by the admin client.
    
    Reviewer: Matthias J. Sax <matth...@confluent.io>, Colin Patrick McCabe 
<cmcc...@apache.org>, Alexandre Dupriez (@Hangleton)
---
 .../processor/internals/InternalTopicManager.java  | 17 +++---
 .../internals/InternalTopicManagerTest.java        | 69 +++++++++++++++++++---
 2 files changed, 71 insertions(+), 15 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 695492122a3..288aaa6aa8c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -57,9 +57,12 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+
+
 public class InternalTopicManager {
     private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
         "Please report at https://issues.apache.org/jira/projects/KAFKA/issues 
or to the dev-mailing list (https://kafka.apache.org/contact).";
@@ -466,6 +469,9 @@ public class InternalTopicManager {
                                                 topicName)
                                         );
                                     }
+                                } else if (cause instanceof TimeoutException) {
+                                    log.error("Creating topic {} timed out.\n" 
+
+                                            "Error message was: {}", 
topicName, cause.toString());
                                 } else {
                                     throw new StreamsException(
                                             String.format("Could not create 
topic %s.", topicName),
@@ -473,9 +479,6 @@ public class InternalTopicManager {
                                     );
                                 }
                             }
-                        } catch (final TimeoutException retriableException) {
-                            log.error("Creating topic {} timed out.\n" +
-                                    "Error message was: {}", topicName, 
retriableException.toString());
                         }
                     }
                 }
@@ -538,15 +541,15 @@ public class InternalTopicManager {
                     tempUnknownTopics.add(topicName);
                     log.debug("The leader of topic {} is not available.\n" +
                         "Error message was: {}", topicName, cause.toString());
+                } else if (cause instanceof TimeoutException) {
+                    tempUnknownTopics.add(topicName);
+                    log.debug("Describing topic {} (to get number of 
partitions) timed out.\n" +
+                            "Error message was: {}", topicName, 
cause.toString());
                 } else {
                     log.error("Unexpected error during topic description for 
{}.\n" +
                         "Error message was: {}", topicName, cause.toString());
                     throw new StreamsException(String.format("Could not create 
topic %s.", topicName), cause);
                 }
-            } catch (final TimeoutException retriableException) {
-                tempUnknownTopics.add(topicName);
-                log.debug("Describing topic {} (to get number of partitions) 
timed out.\n" +
-                    "Error message was: {}", topicName, 
retriableException.toString());
             }
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 04c2b9c3ce8..4d0bd522977 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 package org.apache.kafka.streams.processor.internals;
-
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -59,16 +58,17 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
-
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -87,6 +87,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class InternalTopicManagerTest {
     private final Node broker1 = new Node(0, "dummyHost-1", 1234);
@@ -304,6 +305,35 @@ public class InternalTopicManagerTest {
         );
     }
 
+    @Test
+    public void 
shouldThrowTimeoutExceptionIfGetNumPartitionsHasTopicDescriptionTimeout() {
+        mockAdminClient.timeoutNextRequest(1);
+
+        final InternalTopicManager internalTopicManager =
+                new InternalTopicManager(time, mockAdminClient, new 
StreamsConfig(config));
+        try {
+            final Set<String> topic1set = new 
HashSet<String>(Arrays.asList(topic1));
+            final Set<String> topic2set = new 
HashSet<String>(Arrays.asList(topic2));
+
+            internalTopicManager.getNumPartitions(topic1set, topic2set);
+
+        } catch (final TimeoutException expected) {
+            assertEquals(TimeoutException.class, 
expected.getCause().getClass());
+        }
+
+        mockAdminClient.timeoutNextRequest(1);
+
+        try {
+            final Set<String> topic1set = new 
HashSet<String>(Arrays.asList(topic1));
+            final Set<String> topic2set = new 
HashSet<String>(Arrays.asList(topic2));
+
+            internalTopicManager.getNumPartitions(topic1set, topic2set);
+
+        } catch (final TimeoutException expected) {
+            assertEquals(TimeoutException.class, 
expected.getCause().getClass());
+        }
+    }
+
     @Test
     public void shouldThrowWhenCreateTopicsThrowsUnexpectedException() {
         final AdminClient admin = mock(AdminClient.class);
@@ -742,18 +772,24 @@ public class InternalTopicManagerTest {
 
     @Test
     public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
-        mockAdminClient.timeoutNextRequest(1);
+        mockAdminClient.timeoutNextRequest(5);
+
+        final InternalTopicManager topicManager = new InternalTopicManager(
+                new AutoAdvanceMockTime(time),
+                mockAdminClient,
+                new StreamsConfig(config)
+        );
 
         final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic1, Collections.emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
         try {
-            internalTopicManager.makeReady(Collections.singletonMap(topic1, 
internalTopicConfig));
-            fail("Should have thrown StreamsException.");
-        } catch (final StreamsException expected) {
-            assertEquals(TimeoutException.class, 
expected.getCause().getClass());
+            topicManager.makeReady(Collections.singletonMap(topic1, 
internalTopicConfig));
+            fail("Should have thrown TimeoutException.");
+        } catch (final TimeoutException expected) {
+            assertThat(expected.getMessage(), is("Could not create topics 
within 50 milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not 
available."));
         }
     }
-
     @Test
     public void shouldLogWhenTopicNotFoundAndNotThrowException() {
         mockAdminClient.addTopic(
@@ -1681,6 +1717,21 @@ public class InternalTopicManagerTest {
         return internalTopicConfig;
     }
 
+    private static class AutoAdvanceMockTime extends MockTime {
+        private final MockTime time;
+
+        private AutoAdvanceMockTime(final MockTime time) {
+            this.time = time;
+        }
+
+        @Override
+        public long milliseconds() {
+            final long ms = time.milliseconds();
+            time.sleep(10L);
+            return ms;
+        }
+    }
+
     private static class MockCreateTopicsResult extends CreateTopicsResult {
         MockCreateTopicsResult(final Map<String, 
KafkaFuture<TopicMetadataAndConfig>> futures) {
             super(futures);
@@ -1704,4 +1755,6 @@ public class InternalTopicManagerTest {
             super(futures);
         }
     }
+
+
 }

Reply via email to