This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c580e25bfb MINOR: update flaky CustomHandlerIntegrationTest (#16710)
3c580e25bfb is described below
commit 3c580e25bfbbba27bd63dc335b6a1405e4ef6ef9
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jul 30 08:13:59 2024 -0700
MINOR: update flaky CustomHandlerIntegrationTest (#16710)
This PR reduces the MAX_BLOCK_MS config which defaults to 60sec to
10sec, to avoid a race condition with the 60sec test timeout.
Reviewers: Bill Bejeck <[email protected]>
---
.../integration/CustomHandlerIntegrationTest.java | 57 +++++++++++++---------
1 file changed, 35 insertions(+), 22 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
index 3eea2ec7d84..873b2eb922d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -58,7 +59,6 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
@Tag("integration")
public class CustomHandlerIntegrationTest {
private static final int NUM_BROKERS = 1;
- private static final int NUM_THREADS = 2;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable",
"false")));
@@ -72,28 +72,27 @@ public class CustomHandlerIntegrationTest {
CLUSTER.stop();
}
- private final long timeout = 60000;
+ private final long timeoutMs = 60_000;
// topic name
private static final String STREAM_INPUT = "STREAM_INPUT";
private static final String NON_EXISTING_TOPIC = "non_existing_topic";
+ private final AtomicReference<Throwable> caughtException = new
AtomicReference<>();
+
private KafkaStreams kafkaStreams;
- AtomicReference<Throwable> caughtException;
- Topology topology;
+ private Topology topology;
private String appId;
@BeforeEach
public void before(final TestInfo testInfo) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
CLUSTER.createTopics(STREAM_INPUT);
- caughtException = new AtomicReference<>();
final String safeTestName = safeUniqueTestName(testInfo);
appId = "app-" + safeTestName;
-
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(),
Serdes.String()));
+ .to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(),
Serdes.String()));
produceRecords();
topology = builder.build();
}
@@ -127,7 +126,7 @@ public class CustomHandlerIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
- streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
NUM_THREADS);
+ streamsConfiguration.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10_000);
return streamsConfiguration;
}
@@ -147,20 +146,34 @@ public class CustomHandlerIntegrationTest {
});
kafkaStreams.start();
TestUtils.waitForCondition(
- () -> kafkaStreams.state() == State.RUNNING,
- timeout,
- () -> "Kafka Streams application did not reach state RUNNING
in " + timeout + " ms");
- while (true) {
- if (caughtException.get() != null) {
- final Throwable throwable = caughtException.get();
- assertInstanceOf(StreamsException.class, throwable);
- assertInstanceOf(TimeoutException.class, throwable.getCause());
- assertInstanceOf(UnknownTopicOrPartitionException.class,
throwable.getCause().getCause());
- closeApplication(streamsConfiguration);
- break;
- } else {
- Thread.sleep(100);
- }
+ () -> kafkaStreams.state() == State.RUNNING,
+ timeoutMs,
+ () -> "Kafka Streams application did not reach state RUNNING in "
+ timeoutMs + " ms"
+ );
+
+ TestUtils.waitForCondition(
+ this::receivedUnknownTopicOrPartitionException,
+ timeoutMs,
+ () -> "Did not receive UnknownTopicOrPartitionException"
+ );
+
+ TestUtils.waitForCondition(
+ () -> kafkaStreams.state() == State.ERROR,
+ timeoutMs,
+ () -> "Kafka Streams application did not reach state ERROR in " +
timeoutMs + " ms"
+ );
+ closeApplication(streamsConfiguration);
+ }
+
+ private boolean receivedUnknownTopicOrPartitionException() {
+ if (caughtException.get() == null) {
+ return false;
}
+
+ assertInstanceOf(StreamsException.class, caughtException.get());
+ assertInstanceOf(TimeoutException.class,
caughtException.get().getCause());
+ assertInstanceOf(UnknownTopicOrPartitionException.class,
caughtException.get().getCause().getCause());
+
+ return true;
}
}
\ No newline at end of file