This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8c84d29c2e4 KAFKA-14128: Kafka Streams does not handle
TimeoutException (#13161)
8c84d29c2e4 is described below
commit 8c84d29c2e4f5f056a46e7453c4aa634358ee7bf
Author: Lucia Cerchie <[email protected]>
AuthorDate: Wed Feb 22 23:51:51 2023 -0700
KAFKA-14128: Kafka Streams does not handle TimeoutException (#13161)
Kafka Streams is supposed to handle TimeoutException during internal topic
creation gracefully. This PR fixes the exception handling code to avoid
crashing on an TimeoutException returned by the admin client.
Reviewer: Matthias J. Sax <[email protected]>, Colin Patrick McCabe
<[email protected]>, Alexandre Dupriez (@Hangleton)
---
.../processor/internals/InternalTopicManager.java | 17 +++---
.../internals/InternalTopicManagerTest.java | 69 +++++++++++++++++++---
2 files changed, 71 insertions(+), 15 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 695492122a3..288aaa6aa8c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -57,9 +57,12 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+
+
public class InternalTopicManager {
private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
"Please report at https://issues.apache.org/jira/projects/KAFKA/issues
or to the dev-mailing list (https://kafka.apache.org/contact).";
@@ -466,6 +469,9 @@ public class InternalTopicManager {
topicName)
);
}
+ } else if (cause instanceof TimeoutException) {
+ log.error("Creating topic {} timed out.\n"
+
+ "Error message was: {}",
topicName, cause.toString());
} else {
throw new StreamsException(
String.format("Could not create
topic %s.", topicName),
@@ -473,9 +479,6 @@ public class InternalTopicManager {
);
}
}
- } catch (final TimeoutException retriableException) {
- log.error("Creating topic {} timed out.\n" +
- "Error message was: {}", topicName,
retriableException.toString());
}
}
}
@@ -538,15 +541,15 @@ public class InternalTopicManager {
tempUnknownTopics.add(topicName);
log.debug("The leader of topic {} is not available.\n" +
"Error message was: {}", topicName, cause.toString());
+ } else if (cause instanceof TimeoutException) {
+ tempUnknownTopics.add(topicName);
+ log.debug("Describing topic {} (to get number of
partitions) timed out.\n" +
+ "Error message was: {}", topicName,
cause.toString());
} else {
log.error("Unexpected error during topic description for
{}.\n" +
"Error message was: {}", topicName, cause.toString());
throw new StreamsException(String.format("Could not create
topic %s.", topicName), cause);
}
- } catch (final TimeoutException retriableException) {
- tempUnknownTopics.add(topicName);
- log.debug("Describing topic {} (to get number of partitions)
timed out.\n" +
- "Error message was: {}", topicName,
retriableException.toString());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 04c2b9c3ce8..4d0bd522977 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -15,7 +15,6 @@
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
-
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
@@ -59,16 +58,17 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
@@ -87,6 +87,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class InternalTopicManagerTest {
private final Node broker1 = new Node(0, "dummyHost-1", 1234);
@@ -304,6 +305,35 @@ public class InternalTopicManagerTest {
);
}
+ @Test
+ public void
shouldThrowTimeoutExceptionIfGetNumPartitionsHasTopicDescriptionTimeout() {
+ mockAdminClient.timeoutNextRequest(1);
+
+ final InternalTopicManager internalTopicManager =
+ new InternalTopicManager(time, mockAdminClient, new
StreamsConfig(config));
+ try {
+ final Set<String> topic1set = new
HashSet<String>(Arrays.asList(topic1));
+ final Set<String> topic2set = new
HashSet<String>(Arrays.asList(topic2));
+
+ internalTopicManager.getNumPartitions(topic1set, topic2set);
+
+ } catch (final TimeoutException expected) {
+ assertEquals(TimeoutException.class,
expected.getCause().getClass());
+ }
+
+ mockAdminClient.timeoutNextRequest(1);
+
+ try {
+ final Set<String> topic1set = new
HashSet<String>(Arrays.asList(topic1));
+ final Set<String> topic2set = new
HashSet<String>(Arrays.asList(topic2));
+
+ internalTopicManager.getNumPartitions(topic1set, topic2set);
+
+ } catch (final TimeoutException expected) {
+ assertEquals(TimeoutException.class,
expected.getCause().getClass());
+ }
+ }
+
@Test
public void shouldThrowWhenCreateTopicsThrowsUnexpectedException() {
final AdminClient admin = mock(AdminClient.class);
@@ -742,18 +772,24 @@ public class InternalTopicManagerTest {
@Test
public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
- mockAdminClient.timeoutNextRequest(1);
+ mockAdminClient.timeoutNextRequest(5);
+
+ final InternalTopicManager topicManager = new InternalTopicManager(
+ new AutoAdvanceMockTime(time),
+ mockAdminClient,
+ new StreamsConfig(config)
+ );
final InternalTopicConfig internalTopicConfig = new
RepartitionTopicConfig(topic1, Collections.emptyMap());
internalTopicConfig.setNumberOfPartitions(1);
try {
- internalTopicManager.makeReady(Collections.singletonMap(topic1,
internalTopicConfig));
- fail("Should have thrown StreamsException.");
- } catch (final StreamsException expected) {
- assertEquals(TimeoutException.class,
expected.getCause().getClass());
+ topicManager.makeReady(Collections.singletonMap(topic1,
internalTopicConfig));
+ fail("Should have thrown TimeoutException.");
+ } catch (final TimeoutException expected) {
+ assertThat(expected.getMessage(), is("Could not create topics
within 50 milliseconds. " +
+ "This can happen if the Kafka cluster is temporarily not
available."));
}
}
-
@Test
public void shouldLogWhenTopicNotFoundAndNotThrowException() {
mockAdminClient.addTopic(
@@ -1681,6 +1717,21 @@ public class InternalTopicManagerTest {
return internalTopicConfig;
}
+ private static class AutoAdvanceMockTime extends MockTime {
+ private final MockTime time;
+
+ private AutoAdvanceMockTime(final MockTime time) {
+ this.time = time;
+ }
+
+ @Override
+ public long milliseconds() {
+ final long ms = time.milliseconds();
+ time.sleep(10L);
+ return ms;
+ }
+ }
+
private static class MockCreateTopicsResult extends CreateTopicsResult {
MockCreateTopicsResult(final Map<String,
KafkaFuture<TopicMetadataAndConfig>> futures) {
super(futures);
@@ -1704,4 +1755,6 @@ public class InternalTopicManagerTest {
super(futures);
}
}
+
+
}