This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2ae85ef214b KAFKA-19429: Deflake streams_smoke_test, again (#20070)
2ae85ef214b is described below

commit 2ae85ef214b850b90488829e4f9487be92705fa9
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Tue Jul 1 21:48:07 2025 +0200

    KAFKA-19429: Deflake streams_smoke_test, again (#20070)
    
    It looks like we are checking for properties that are not guaranteed
    under at_least_once, for example, exact counting (not allowing for
    overcounting).
    
    This change relaxes the validation constraint:
    
    The TAGG topic contains effectively count-by-count results. So for
    example, if we have the input without duplication
    
    0 -> 1,2,3 we will get in TAGG 3 -> 1, since 1 key had 3 values.
    
    with duplication:
    
    0 -> 1,1,2,3 we will get in TAGG 4 -> 1, since 1 key had 4 values.
    
    This makes the result difficult to compare. Since we run the smoke test
    also with Exactly_Once, I propose to disable validation off TAGG under
    ALOS.
    
    Similarly, the topic AVG may overcount or undercount. The test case is
    extremely similar to DIF, both performing a join and two streams, the
    only difference being the mathematical operation performed, so we can
    also disable this validation under ALOS with minimal loss of coverage.
    
    Finally, the change fixes a bug that would throw a NPE when validation
    of a windowed stream would fail.
    
    Reviewers: Kirk True <k...@kirktrue.pro>, Matthias J. Sax
     <matth...@confluent.io>
---
 .../org/apache/kafka/streams/tests/SmokeTestDriver.java  | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ad3cead1763..59698607912 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -522,7 +522,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
         boolean pass;
         try (final PrintStream resultStream = new 
PrintStream(byteArrayOutputStream)) {
-            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), 
validationPredicate, printResults);
+            pass = true;
+            if (eosEnabled) {
+                // TAGG is computing "Count-by-count", which may produce keys 
that are not in the input data in ALOS, so we skip validation in this case.
+                pass = verifyTAgg(resultStream, inputs, events.get("tagg"), 
printResults);
+            }
             pass &= verifySuppressed(resultStream, "min-suppressed", events, 
printResults);
             pass &= verify(resultStream, "min-suppressed", inputs, events, 
windowedKey -> {
                 final String unwindowedKey = windowedKey.substring(1, 
windowedKey.length() - 1).replaceAll("@.*", "");
@@ -534,7 +538,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
             pass &= verify(resultStream, "dif", inputs, events, key -> 
getMax(key).intValue() - getMin(key).intValue(), Object::equals, printResults, 
eosEnabled);
             pass &= verify(resultStream, "sum", inputs, events, 
SmokeTestDriver::getSum, validationPredicate, printResults, eosEnabled);
             pass &= verify(resultStream, "cnt", inputs, events, key1 -> 
getMax(key1).intValue() - getMin(key1).intValue() + 1L, validationPredicate, 
printResults, eosEnabled);
-            pass &= verify(resultStream, "avg", inputs, events, 
SmokeTestDriver::getAvg, validationPredicate, printResults, eosEnabled);
+            if (eosEnabled) {
+                // Average can overcount and undercount in ALOS, so we skip 
validation in that case.
+                pass &= verify(resultStream, "avg", inputs, events, 
SmokeTestDriver::getAvg, Object::equals, printResults, eosEnabled);
+            }
         }
         return new VerificationResult(pass, new 
String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
     }
@@ -580,7 +587,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     if (printResults) {
                         resultStream.printf("\t inputEvents=%n%s%n\t" +
                                 
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
-                            indent("\t\t", observedInputEvents.get(key)),
+                            indent("\t\t", 
observedInputEvents.getOrDefault(key, new LinkedList<>())),
                             indent("\t\t", events.getOrDefault("echo", 
emptyMap()).getOrDefault(key, new LinkedList<>())),
                             indent("\t\t", events.getOrDefault("max", 
emptyMap()).getOrDefault(key, new LinkedList<>())),
                             indent("\t\t", events.getOrDefault("min", 
emptyMap()).getOrDefault(key, new LinkedList<>())),
@@ -662,7 +669,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
     private static boolean verifyTAgg(final PrintStream resultStream,
                                       final Map<String, Set<Integer>> allData,
                                       final Map<String, 
LinkedList<ConsumerRecord<String, Number>>> taggEvents,
-                                      final BiPredicate<Number, Number> 
validationPredicate,
                                       final boolean printResults) {
         resultStream.println("verifying topic tagg");
         if (taggEvents == null) {
@@ -694,7 +700,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     expectedCount = 0L;
                 }
 
-                if (!validationPredicate.test(expectedCount, 
entry.getValue().getLast().value())) {
+                if (entry.getValue().getLast().value().longValue() != 
expectedCount) {
                     resultStream.println("fail: key=" + key + " tagg=" + 
entry.getValue() + " expected=" + expectedCount);
 
                     if (printResults)

Reply via email to