This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c580e25bfb MINOR: update flaky CustomHandlerIntegrationTest (#16710)
3c580e25bfb is described below

commit 3c580e25bfbbba27bd63dc335b6a1405e4ef6ef9
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jul 30 08:13:59 2024 -0700

    MINOR: update flaky CustomHandlerIntegrationTest (#16710)
    
    This PR reduces the MAX_BLOCK_MS config which defaults to 60sec to
    10sec, to avoid a race condition with the 60sec test timeout.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../integration/CustomHandlerIntegrationTest.java  | 57 +++++++++++++---------
 1 file changed, 35 insertions(+), 22 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
index 3eea2ec7d84..873b2eb922d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -58,7 +59,6 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 @Tag("integration")
 public class CustomHandlerIntegrationTest {
     private static final int NUM_BROKERS = 1;
-    private static final int NUM_THREADS = 2;
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
             
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", 
"false")));
 
@@ -72,28 +72,27 @@ public class CustomHandlerIntegrationTest {
         CLUSTER.stop();
     }
 
-    private final long timeout = 60000;
+    private final long timeoutMs = 60_000;
 
     // topic name
     private static final String STREAM_INPUT = "STREAM_INPUT";
     private static final String NON_EXISTING_TOPIC = "non_existing_topic";
 
+    private final AtomicReference<Throwable> caughtException = new 
AtomicReference<>();
+
     private KafkaStreams kafkaStreams;
-    AtomicReference<Throwable> caughtException;
-    Topology topology;
+    private Topology topology;
     private String appId;
 
     @BeforeEach
     public void before(final TestInfo testInfo) throws InterruptedException {
         final StreamsBuilder builder = new StreamsBuilder();
         CLUSTER.createTopics(STREAM_INPUT);
-        caughtException = new AtomicReference<>();
         final String safeTestName = safeUniqueTestName(testInfo);
         appId = "app-" + safeTestName;
 
-
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
-                .to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(), 
Serdes.String()));
+            .to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(), 
Serdes.String()));
         produceRecords();
         topology = builder.build();
     }
@@ -127,7 +126,7 @@ public class CustomHandlerIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
-        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
NUM_THREADS);
+        streamsConfiguration.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10_000);
         return streamsConfiguration;
     }
 
@@ -147,20 +146,34 @@ public class CustomHandlerIntegrationTest {
         });
         kafkaStreams.start();
         TestUtils.waitForCondition(
-                () -> kafkaStreams.state() == State.RUNNING,
-                timeout,
-                () -> "Kafka Streams application did not reach state RUNNING 
in " + timeout + " ms");
-        while (true) {
-            if (caughtException.get() != null) {
-                final Throwable throwable = caughtException.get();
-                assertInstanceOf(StreamsException.class, throwable);
-                assertInstanceOf(TimeoutException.class, throwable.getCause());
-                assertInstanceOf(UnknownTopicOrPartitionException.class, 
throwable.getCause().getCause());
-                closeApplication(streamsConfiguration);
-                break;
-            } else {
-                Thread.sleep(100);
-            }
+            () -> kafkaStreams.state() == State.RUNNING,
+            timeoutMs,
+            () -> "Kafka Streams application did not reach state RUNNING in " 
+ timeoutMs + " ms"
+        );
+
+        TestUtils.waitForCondition(
+            this::receivedUnknownTopicOrPartitionException,
+            timeoutMs,
+            () -> "Did not receive UnknownTopicOrPartitionException"
+        );
+
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.ERROR,
+            timeoutMs,
+            () -> "Kafka Streams application did not reach state ERROR in " + 
timeoutMs + " ms"
+        );
+        closeApplication(streamsConfiguration);
+    }
+
+    private boolean receivedUnknownTopicOrPartitionException() {
+        if (caughtException.get() == null) {
+            return false;
         }
+
+        assertInstanceOf(StreamsException.class, caughtException.get());
+        assertInstanceOf(TimeoutException.class, 
caughtException.get().getCause());
+        assertInstanceOf(UnknownTopicOrPartitionException.class, 
caughtException.get().getCause().getCause());
+
+        return true;
     }
 }
\ No newline at end of file

Reply via email to