This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8c84d29c2e4 KAFKA-14128: Kafka Streams does not handle TimeoutException (#13161) 8c84d29c2e4 is described below commit 8c84d29c2e4f5f056a46e7453c4aa634358ee7bf Author: Lucia Cerchie <luciacerc...@gmail.com> AuthorDate: Wed Feb 22 23:51:51 2023 -0700 KAFKA-14128: Kafka Streams does not handle TimeoutException (#13161) Kafka Streams is supposed to handle TimeoutException during internal topic creation gracefully. This PR fixes the exception handling code to avoid crashing on an TimeoutException returned by the admin client. Reviewer: Matthias J. Sax <matth...@confluent.io>, Colin Patrick McCabe <cmcc...@apache.org>, Alexandre Dupriez (@Hangleton) --- .../processor/internals/InternalTopicManager.java | 17 +++--- .../internals/InternalTopicManagerTest.java | 69 +++++++++++++++++++--- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 695492122a3..288aaa6aa8c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -57,9 +57,12 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; + import java.util.function.BiConsumer; import java.util.stream.Collectors; + + public class InternalTopicManager { private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact)."; @@ -466,6 +469,9 @@ public class InternalTopicManager { topicName) ); } + } else if (cause instanceof TimeoutException) { + log.error("Creating topic {} timed out.\n" + + "Error message was: {}", topicName, cause.toString()); } else { throw new StreamsException( String.format("Could not create topic %s.", topicName), @@ -473,9 +479,6 @@ public class InternalTopicManager { ); } } - } catch (final TimeoutException retriableException) { - log.error("Creating topic {} timed out.\n" + - "Error message was: {}", topicName, retriableException.toString()); } } } @@ -538,15 +541,15 @@ public class InternalTopicManager { tempUnknownTopics.add(topicName); log.debug("The leader of topic {} is not available.\n" + "Error message was: {}", topicName, cause.toString()); + } else if (cause instanceof TimeoutException) { + tempUnknownTopics.add(topicName); + log.debug("Describing topic {} (to get number of partitions) timed out.\n" + + "Error message was: {}", topicName, cause.toString()); } else { log.error("Unexpected error during topic description for {}.\n" + "Error message was: {}", topicName, cause.toString()); throw new StreamsException(String.format("Could not create topic %s.", topicName), cause); } - } catch (final TimeoutException retriableException) { - tempUnknownTopics.add(topicName); - log.debug("Describing topic {} (to get number of partitions) timed out.\n" + - "Error message was: {}", topicName, retriableException.toString()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 04c2b9c3ce8..4d0bd522977 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -15,7 +15,6 @@ * limitations under the License. */ package org.apache.kafka.streams.processor.internals; - import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; @@ -59,16 +58,17 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; - import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; @@ -87,6 +87,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.StrictStubs.class) public class InternalTopicManagerTest { private final Node broker1 = new Node(0, "dummyHost-1", 1234); @@ -304,6 +305,35 @@ public class InternalTopicManagerTest { ); } + @Test + public void shouldThrowTimeoutExceptionIfGetNumPartitionsHasTopicDescriptionTimeout() { + mockAdminClient.timeoutNextRequest(1); + + final InternalTopicManager internalTopicManager = + new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config)); + try { + final Set<String> topic1set = new HashSet<String>(Arrays.asList(topic1)); + final Set<String> topic2set = new HashSet<String>(Arrays.asList(topic2)); + + internalTopicManager.getNumPartitions(topic1set, topic2set); + + } catch (final TimeoutException expected) { + assertEquals(TimeoutException.class, expected.getCause().getClass()); + } + + mockAdminClient.timeoutNextRequest(1); + + try { + final Set<String> topic1set = new HashSet<String>(Arrays.asList(topic1)); + final Set<String> topic2set = new HashSet<String>(Arrays.asList(topic2)); + + internalTopicManager.getNumPartitions(topic1set, topic2set); + + } catch (final TimeoutException expected) { + assertEquals(TimeoutException.class, expected.getCause().getClass()); + } + } + @Test public void shouldThrowWhenCreateTopicsThrowsUnexpectedException() { final AdminClient admin = mock(AdminClient.class); @@ -742,18 +772,24 @@ public class InternalTopicManagerTest { @Test public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() { - mockAdminClient.timeoutNextRequest(1); + mockAdminClient.timeoutNextRequest(5); + + final InternalTopicManager topicManager = new InternalTopicManager( + new AutoAdvanceMockTime(time), + mockAdminClient, + new StreamsConfig(config) + ); final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap()); internalTopicConfig.setNumberOfPartitions(1); try { - internalTopicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig)); - fail("Should have thrown StreamsException."); - } catch (final StreamsException expected) { - assertEquals(TimeoutException.class, expected.getCause().getClass()); + topicManager.makeReady(Collections.singletonMap(topic1, internalTopicConfig)); + fail("Should have thrown TimeoutException."); + } catch (final TimeoutException expected) { + assertThat(expected.getMessage(), is("Could not create topics within 50 milliseconds. " + + "This can happen if the Kafka cluster is temporarily not available.")); } } - @Test public void shouldLogWhenTopicNotFoundAndNotThrowException() { mockAdminClient.addTopic( @@ -1681,6 +1717,21 @@ public class InternalTopicManagerTest { return internalTopicConfig; } + private static class AutoAdvanceMockTime extends MockTime { + private final MockTime time; + + private AutoAdvanceMockTime(final MockTime time) { + this.time = time; + } + + @Override + public long milliseconds() { + final long ms = time.milliseconds(); + time.sleep(10L); + return ms; + } + } + private static class MockCreateTopicsResult extends CreateTopicsResult { MockCreateTopicsResult(final Map<String, KafkaFuture<TopicMetadataAndConfig>> futures) { super(futures); @@ -1704,4 +1755,6 @@ public class InternalTopicManagerTest { super(futures); } } + + }