This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71c5a426b8a KAFKA-12506: Strengthen AdjustStreamThreadCountTest with
stateful counting and higher throughput (#20540)
71c5a426b8a is described below
commit 71c5a426b8a02b80b5b7d2494ac278467d2c2f29
Author: Deep Golani <[email protected]>
AuthorDate: Mon Sep 29 11:23:05 2025 -0400
KAFKA-12506: Strengthen AdjustStreamThreadCountTest with stateful counting
and higher throughput (#20540)
Add count store and output topic; produce 1,000 records across 50 keys
to better exercise concurrency.
Reviewers: Matthias J. Sax <[email protected]>
---
.../integration/AdjustStreamThreadCountTest.java | 43 +++++++++++++++++++++-
1 file changed, 41 insertions(+), 2 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index df96837458c..d9779b5a9c5 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -17,6 +17,9 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogCaptureAppender;
@@ -28,6 +31,9 @@ import
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -93,6 +99,7 @@ public class AdjustStreamThreadCountTest {
private final List<KafkaStreams.State> stateTransitionHistory = new
ArrayList<>();
private static String inputTopic;
+ private static String outputTopic;
private static StreamsBuilder builder;
private static Properties properties;
private static String appId = "";
@@ -103,10 +110,21 @@ public class AdjustStreamThreadCountTest {
final String testId = safeUniqueTestName(testInfo);
appId = "appId_" + testId;
inputTopic = "input" + testId;
+ outputTopic = "output" + testId;
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
builder = new StreamsBuilder();
- builder.stream(inputTopic);
+ // Build a simple stateful topology to exercise concurrency with state
stores
+ final KStream<String, String> source = builder.stream(inputTopic);
+ final KTable<String, Long> counts = source
+ .groupByKey()
+ .count(Named.as("counts"), Materialized.as("counts-store"));
+ counts
+ .toStream()
+ .mapValues(Object::toString)
+ .to(outputTopic);
+
+ produceTestRecords(inputTopic, CLUSTER);
properties = mkObjectProperties(
mkMap(
@@ -121,6 +139,21 @@ public class AdjustStreamThreadCountTest {
);
}
+ private void produceTestRecords(final String inputTopic, final
EmbeddedKafkaCluster cluster) {
+ final Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-client");
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ try (KafkaProducer<String, String> producer = new
KafkaProducer<>(props)) {
+ for (int i = 0; i < 1000; i++) {
+ final String key = "key-" + (i % 50);
+ final String value = "value-" + i;
+ producer.send(new ProducerRecord<>(inputTopic, key, value));
+ }
+ }
+ }
+
private void startStreamsAndWaitForRunning(final KafkaStreams
kafkaStreams) throws InterruptedException {
kafkaStreams.start();
waitForRunning();
@@ -251,7 +284,13 @@ public class AdjustStreamThreadCountTest {
assertTrue(latch.await(30, TimeUnit.SECONDS));
one.join();
two.join();
-
+ waitForCondition(
+ () -> kafkaStreams.metadataForLocalThreads().size() ==
oldThreadCount &&
+ kafkaStreams.state() == KafkaStreams.State.RUNNING,
+ DEFAULT_DURATION.toMillis(),
+ "Kafka Streams did not stabilize at the expected thread
count and RUNNING state."
+ );
+
threadMetadata = kafkaStreams.metadataForLocalThreads();
assertThat(threadMetadata.size(), equalTo(oldThreadCount));
} catch (final AssertionError e) {