This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71c5a426b8a KAFKA-12506: Strengthen AdjustStreamThreadCountTest with 
stateful counting and higher throughput (#20540)
71c5a426b8a is described below

commit 71c5a426b8a02b80b5b7d2494ac278467d2c2f29
Author: Deep Golani <[email protected]>
AuthorDate: Mon Sep 29 11:23:05 2025 -0400

    KAFKA-12506: Strengthen AdjustStreamThreadCountTest with stateful counting 
and higher throughput (#20540)
    
    Add count store and output topic; produce 1,000 records across 50 keys
    to better exercise concurrency.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../integration/AdjustStreamThreadCountTest.java   | 43 +++++++++++++++++++++-
 1 file changed, 41 insertions(+), 2 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index df96837458c..d9779b5a9c5 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogCaptureAppender;
@@ -28,6 +31,9 @@ import 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -93,6 +99,7 @@ public class AdjustStreamThreadCountTest {
 
     private final List<KafkaStreams.State> stateTransitionHistory = new 
ArrayList<>();
     private static String inputTopic;
+    private static String outputTopic;
     private static StreamsBuilder builder;
     private static Properties properties;
     private static String appId = "";
@@ -103,10 +110,21 @@ public class AdjustStreamThreadCountTest {
         final String testId = safeUniqueTestName(testInfo);
         appId = "appId_" + testId;
         inputTopic = "input" + testId;
+        outputTopic = "output" + testId;
         IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
 
         builder = new StreamsBuilder();
-        builder.stream(inputTopic);
+        // Build a simple stateful topology to exercise concurrency with state 
stores
+        final KStream<String, String> source = builder.stream(inputTopic);
+        final KTable<String, Long> counts = source
+            .groupByKey()
+            .count(Named.as("counts"), Materialized.as("counts-store"));
+        counts
+            .toStream()
+            .mapValues(Object::toString)
+            .to(outputTopic);
+
+        produceTestRecords(inputTopic, CLUSTER);
 
         properties = mkObjectProperties(
             mkMap(
@@ -121,6 +139,21 @@ public class AdjustStreamThreadCountTest {
         );
     }
 
+    private void produceTestRecords(final String inputTopic, final 
EmbeddedKafkaCluster cluster) {
+        final Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-client");
+        props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(props)) {
+            for (int i = 0; i < 1000; i++) {
+                final String key = "key-" + (i % 50);
+                final String value = "value-" + i;
+                producer.send(new ProducerRecord<>(inputTopic, key, value));
+            }
+        } 
+    }
+
     private void startStreamsAndWaitForRunning(final KafkaStreams 
kafkaStreams) throws InterruptedException {
         kafkaStreams.start();
         waitForRunning();
@@ -251,7 +284,13 @@ public class AdjustStreamThreadCountTest {
                 assertTrue(latch.await(30, TimeUnit.SECONDS));
                 one.join();
                 two.join();
-
+                waitForCondition(
+                    () -> kafkaStreams.metadataForLocalThreads().size() == 
oldThreadCount &&
+                        kafkaStreams.state() == KafkaStreams.State.RUNNING,
+                    DEFAULT_DURATION.toMillis(),
+                    "Kafka Streams did not stabilize at the expected thread 
count and RUNNING state."
+                );
+                
                 threadMetadata = kafkaStreams.metadataForLocalThreads();
                 assertThat(threadMetadata.size(), equalTo(oldThreadCount));
             } catch (final AssertionError e) {

Reply via email to