This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2ae85ef214b KAFKA-19429: Deflake streams_smoke_test, again (#20070) 2ae85ef214b is described below commit 2ae85ef214b850b90488829e4f9487be92705fa9 Author: Lucas Brutschy <lbruts...@confluent.io> AuthorDate: Tue Jul 1 21:48:07 2025 +0200 KAFKA-19429: Deflake streams_smoke_test, again (#20070) It looks like we are checking for properties that are not guaranteed under at_least_once, for example, exact counting (not allowing for overcounting). This change relaxes the validation constraint: The TAGG topic contains effectively count-by-count results. So for example, if we have the input without duplication 0 -> 1,2,3 we will get in TAGG 3 -> 1, since 1 key had 3 values. with duplication: 0 -> 1,1,2,3 we will get in TAGG 4 -> 1, since 1 key had 4 values. This makes the result difficult to compare. Since we run the smoke test also with Exactly_Once, I propose to disable validation off TAGG under ALOS. Similarly, the topic AVG may overcount or undercount. The test case is extremely similar to DIF, both performing a join and two streams, the only difference being the mathematical operation performed, so we can also disable this validation under ALOS with minimal loss of coverage. Finally, the change fixes a bug that would throw a NPE when validation of a windowed stream would fail. Reviewers: Kirk True <k...@kirktrue.pro>, Matthias J. Sax <matth...@confluent.io> --- .../org/apache/kafka/streams/tests/SmokeTestDriver.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ad3cead1763..59698607912 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -522,7 +522,11 @@ public class SmokeTestDriver extends SmokeTestUtil { } boolean pass; try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) { - pass = verifyTAgg(resultStream, inputs, events.get("tagg"), validationPredicate, printResults); + pass = true; + if (eosEnabled) { + // TAGG is computing "Count-by-count", which may produce keys that are not in the input data in ALOS, so we skip validation in this case. + pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults); + } pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults); pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> { final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", ""); @@ -534,7 +538,10 @@ public class SmokeTestDriver extends SmokeTestUtil { pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), Object::equals, printResults, eosEnabled); pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, validationPredicate, printResults, eosEnabled); pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, validationPredicate, printResults, eosEnabled); - pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, validationPredicate, printResults, eosEnabled); + if (eosEnabled) { + // Average can overcount and undercount in ALOS, so we skip validation in that case. + pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, Object::equals, printResults, eosEnabled); + } } return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8)); } @@ -580,7 +587,7 @@ public class SmokeTestDriver extends SmokeTestUtil { if (printResults) { resultStream.printf("\t inputEvents=%n%s%n\t" + "echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n", - indent("\t\t", observedInputEvents.get(key)), + indent("\t\t", observedInputEvents.getOrDefault(key, new LinkedList<>())), indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())), indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())), indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())), @@ -662,7 +669,6 @@ public class SmokeTestDriver extends SmokeTestUtil { private static boolean verifyTAgg(final PrintStream resultStream, final Map<String, Set<Integer>> allData, final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents, - final BiPredicate<Number, Number> validationPredicate, final boolean printResults) { resultStream.println("verifying topic tagg"); if (taggEvents == null) { @@ -694,7 +700,7 @@ public class SmokeTestDriver extends SmokeTestUtil { expectedCount = 0L; } - if (!validationPredicate.test(expectedCount, entry.getValue().getLast().value())) { + if (entry.getValue().getLast().value().longValue() != expectedCount) { resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount); if (printResults)