This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1623938616 MINOR: Migrate `verificationAllTransactionComplete` from 
EOS system test to smoke test (#20718)
a1623938616 is described below

commit a162393861628ffc85c62dd94e218a872c770e95
Author: Jinhe Zhang <[email protected]>
AuthorDate: Mon Nov 24 18:36:55 2025 -0500

    MINOR: Migrate `verificationAllTransactionComplete` from EOS system test to 
smoke test (#20718)
    
    Migrate `verificationAllTransactionComplete` and add
    `VerificationResult` to smoke test utils and make
    `verificationAllTransactionComplete` return `VerificationResult` to
    avoid extra exception handling.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/streams/tests/SmokeTestDriver.java       | 184 ++++++++++++++++-----
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |  18 ++
 2 files changed, 159 insertions(+), 43 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 59698607912..c457d2a739b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -25,9 +25,11 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
@@ -92,6 +94,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
     }
 
     private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+    private static final long MAX_IDLE_TIME_MS = 600000L;
 
     private static class ValueList {
         public final String key;
@@ -372,33 +375,52 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
     }
 
-    public static VerificationResult verify(final String kafka,
-                                            final Map<String, Set<Integer>> 
inputs,
-                                            final int maxRecordsPerKey,
-                                            final boolean eosEnabled) {
-        final Properties props = new Properties();
-        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
NumberDeserializer.class);
-        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+    private static class PollResult {
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, 
Number>>>> events;
+        final int recordsProcessed;
+        final VerificationResult verificationResult;
+
+        PollResult(final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events,
+                   final int recordsProcessed,
+                   final VerificationResult verificationResult) {
+            this.events = events;
+            this.recordsProcessed = recordsProcessed;
+            this.verificationResult = verificationResult;
+        }
+    }
 
-        final KafkaConsumer<String, Number> consumer = new 
KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, 
NUMERIC_VALUE_TOPICS);
-        consumer.assign(partitions);
-        consumer.seekToBeginning(partitions);
+    private static VerificationResult preVerifyTransactions(final String 
kafka, final boolean eosEnabled) {
+        if (!eosEnabled) {
+            return new VerificationResult(true, "EOS is disabled; skipping 
transaction verification");
+        }
+
+        final VerificationResult txnResult = 
verifyAllTransactionFinished(kafka);
+        if (!txnResult.passed()) {
+            System.err.println("Transaction verification failed: " + 
txnResult.result());
+            System.out.println("FAILED");
+        }
+        return txnResult;
+    }
 
+    private static PollResult pollAndCollect(
+            final KafkaConsumer<String, Number> consumer,
+            final Map<String, Set<Integer>> inputs,
+            final int maxRecordsPerKey,
+            final boolean eosEnabled) {
         final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, 
Number>>>> events = new HashMap<>();
+        VerificationResult verificationResult = new VerificationResult(false, 
"no results yet");
+        final long start = System.currentTimeMillis();
         int recordsProcessed = 0;
+
+        final List<TopicPartition> partitions = getAllPartitions(consumer, 
NUMERIC_VALUE_TOPICS);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
         final Map<String, AtomicInteger> processed =
             Stream.of(NUMERIC_VALUE_TOPICS)
                   .collect(Collectors.toMap(t -> t, t -> new 
AtomicInteger(0)));
 
-        final Map<String, Map<String, LinkedList<ConsumerRecord<String, 
Number>>>> events = new HashMap<>();
-
-        VerificationResult verificationResult = new VerificationResult(false, 
"no results yet");
         int retry = 0;
-        final long start = System.currentTimeMillis();
         while (System.currentTimeMillis() - start < 
TimeUnit.MINUTES.toMillis(6)) {
             final ConsumerRecords<String, Number> records = 
consumer.poll(Duration.ofSeconds(5));
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
@@ -436,23 +458,32 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 System.out.println(processed);
             }
         }
-        consumer.close();
 
-        final long finished = System.currentTimeMillis() - start;
+        return new PollResult(events, recordsProcessed, verificationResult);
+    }
+
+    private static VerificationResult reportAndFinalize(
+            final Map<String, Set<Integer>> inputs,
+            final int maxRecordsPerKey,
+            final long startTime,
+            final boolean eosEnabled,
+            final PollResult pollResult) {
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        final long finished = System.currentTimeMillis() - startTime;
         System.out.println("Verification time=" + finished);
         System.out.println("-------------------");
         System.out.println("Result Verification");
         System.out.println("-------------------");
         System.out.println("recordGenerated=" + recordsGenerated);
-        System.out.println("recordProcessed=" + recordsProcessed);
+        System.out.println("recordProcessed=" + pollResult.recordsProcessed);
 
-        if (recordsProcessed > recordsGenerated) {
+        if (pollResult.recordsProcessed > recordsGenerated) {
             System.out.println("PROCESSED-MORE-THAN-GENERATED");
-        } else if (recordsProcessed < recordsGenerated) {
+        } else if (pollResult.recordsProcessed < recordsGenerated) {
             System.out.println("PROCESSED-LESS-THAN-GENERATED");
         }
 
-        final Map<String, Set<Number>> received = 
parseRecordsForEchoTopic(events);
+        final Map<String, Set<Number>> received = 
parseRecordsForEchoTopic(pollResult.events);
 
         boolean success = inputs.equals(received);
 
@@ -466,9 +497,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
             System.out.println("missedRecords=" + missedCount);
         }
 
+        VerificationResult verificationResult = pollResult.verificationResult;
         // give it one more try if it's not already passing.
         if (!verificationResult.passed()) {
-            verificationResult = verifyAll(inputs, events, true, eosEnabled);
+            verificationResult = verifyAll(inputs, pollResult.events, true, 
eosEnabled);
         }
         success &= verificationResult.passed();
 
@@ -478,6 +510,29 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return verificationResult;
     }
 
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> 
inputs,
+                                            final int maxRecordsPerKey,
+                                            final boolean eosEnabled) {
+        final VerificationResult txnResult = preVerifyTransactions(kafka, 
eosEnabled);
+        if (!txnResult.passed()) {
+            return txnResult;
+        }
+
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+        final long start = System.currentTimeMillis();
+        try (final KafkaConsumer<String, Number> consumer = new 
KafkaConsumer<>(props)) {
+            final PollResult pollResult = pollAndCollect(consumer, inputs, 
maxRecordsPerKey, eosEnabled);
+            return reportAndFinalize(inputs, maxRecordsPerKey, start, 
eosEnabled, pollResult);
+        }
+    }
+
     private static Map<String, Set<Number>> parseRecordsForEchoTopic(
         final Map<String, Map<String, LinkedList<ConsumerRecord<String, 
Number>>>> events) {
         return events.containsKey("echo") ?
@@ -491,24 +546,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)) : Collections.emptyMap();
     }
 
-    public static class VerificationResult {
-        private final boolean passed;
-        private final String result;
-
-        VerificationResult(final boolean passed, final String result) {
-            this.passed = passed;
-            this.result = result;
-        }
-
-        public boolean passed() {
-            return passed;
-        }
-
-        public String result() {
-            return result;
-        }
-    }
-
     private static VerificationResult verifyAll(final Map<String, 
Set<Integer>> inputs,
                                                 final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events,
                                                 final boolean printResults,
@@ -732,4 +769,65 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return partitions;
     }
 
+    private static Properties createConsumerPropsWithByteDeserializer(final 
String kafka, final String clientId) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+        return props;
+    }
+
+    private static VerificationResult verifyAllTransactionFinished(final 
String kafka) {
+        final Properties txnProps = 
createConsumerPropsWithByteDeserializer(kafka, "verifier");
+        txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString());
+        try (final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(txnProps)) {
+            // Get all output topics except "data" (which is the input topic)
+            final String[] outputTopics = Arrays.stream(NUMERIC_VALUE_TOPICS)
+                    .filter(topic -> !topic.equals("data"))
+                    .toArray(String[]::new);
+
+            final List<TopicPartition> partitions = getAllPartitions(consumer, 
outputTopics);
+            consumer.assign(partitions);
+            consumer.seekToEnd(partitions);
+            for (final TopicPartition tp : partitions) {
+                System.out.println(tp + " at position " + 
consumer.position(tp));
+            }
+            final Properties consumerProps = 
createConsumerPropsWithByteDeserializer(kafka, "consumer-uncommitted");
+
+            final long maxWaitTime = System.currentTimeMillis() + 
MAX_IDLE_TIME_MS;
+            try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new 
KafkaConsumer<>(consumerProps)) {
+                while (!partitions.isEmpty() && System.currentTimeMillis() < 
maxWaitTime) {
+                    consumer.seekToEnd(partitions);
+                    final Map<TopicPartition, Long> topicEndOffsets = 
consumerUncommitted.endOffsets(partitions);
+
+                    final java.util.Iterator<TopicPartition> iterator = 
partitions.iterator();
+                    while (iterator.hasNext()) {
+                        final TopicPartition topicPartition = iterator.next();
+                        final long position = 
consumer.position(topicPartition);
+
+                        if (position == topicEndOffsets.get(topicPartition)) {
+                            iterator.remove();
+                            System.out.println("Removing " + topicPartition + 
" at position " + position);
+                        } else if (position > 
topicEndOffsets.get(topicPartition)) {
+                            return new VerificationResult(false, "Offset for 
partition " + topicPartition + " is larger than topic endOffset: " + position + 
" > " + topicEndOffsets.get(topicPartition));
+                        } else {
+                            System.out.println("Retry " + topicPartition + " 
at position " + position);
+                        }
+                    }
+                    sleep(1000L);
+                }
+            }
+
+            if (!partitions.isEmpty()) {
+                return new VerificationResult(false, "Could not read all 
verification records. Did not receive any new record within the last " + 
(MAX_IDLE_TIME_MS / 1000L) + " sec.");
+            }
+            return new VerificationResult(true, "All transactions finished 
successfully");
+        } catch (final Exception e) {
+            e.printStackTrace(System.err);
+            System.out.println("FAILED");
+            return new VerificationResult(false, "Transaction verification 
failed: " + e.getMessage());
+        }
+    }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index d0ad6c8cabb..a195980abb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -134,4 +134,22 @@ public class SmokeTestUtil {
         } catch (final Exception ignore) { }
     }
 
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
+
+        public VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
+        }
+
+        public boolean passed() {
+            return passed;
+        }
+
+        public String result() {
+            return result;
+        }
+    }
+
 }

Reply via email to