This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a1623938616 MINOR: Migrate `verificationAllTransactionComplete` from
EOS system test to smoke test (#20718)
a1623938616 is described below
commit a162393861628ffc85c62dd94e218a872c770e95
Author: Jinhe Zhang <[email protected]>
AuthorDate: Mon Nov 24 18:36:55 2025 -0500
MINOR: Migrate `verificationAllTransactionComplete` from EOS system test to
smoke test (#20718)
Migrate `verificationAllTransactionComplete` and add
`VerificationResult` to smoke test utils and make
`verificationAllTransactionComplete` return `VerificationResult` to
avoid extra exception handling.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/streams/tests/SmokeTestDriver.java | 184 ++++++++++++++++-----
.../apache/kafka/streams/tests/SmokeTestUtil.java | 18 ++
2 files changed, 159 insertions(+), 43 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 59698607912..c457d2a739b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -25,9 +25,11 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
@@ -92,6 +94,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+ private static final long MAX_IDLE_TIME_MS = 600000L;
private static class ValueList {
public final String key;
@@ -372,33 +375,52 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
}
- public static VerificationResult verify(final String kafka,
- final Map<String, Set<Integer>>
inputs,
- final int maxRecordsPerKey,
- final boolean eosEnabled) {
- final Properties props = new Properties();
- props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NumberDeserializer.class);
- props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ private static class PollResult {
+ final Map<String, Map<String, LinkedList<ConsumerRecord<String,
Number>>>> events;
+ final int recordsProcessed;
+ final VerificationResult verificationResult;
+
+ PollResult(final Map<String, Map<String,
LinkedList<ConsumerRecord<String, Number>>>> events,
+ final int recordsProcessed,
+ final VerificationResult verificationResult) {
+ this.events = events;
+ this.recordsProcessed = recordsProcessed;
+ this.verificationResult = verificationResult;
+ }
+ }
- final KafkaConsumer<String, Number> consumer = new
KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer,
NUMERIC_VALUE_TOPICS);
- consumer.assign(partitions);
- consumer.seekToBeginning(partitions);
+ private static VerificationResult preVerifyTransactions(final String
kafka, final boolean eosEnabled) {
+ if (!eosEnabled) {
+ return new VerificationResult(true, "EOS is disabled; skipping
transaction verification");
+ }
+
+ final VerificationResult txnResult =
verifyAllTransactionFinished(kafka);
+ if (!txnResult.passed()) {
+ System.err.println("Transaction verification failed: " +
txnResult.result());
+ System.out.println("FAILED");
+ }
+ return txnResult;
+ }
+ private static PollResult pollAndCollect(
+ final KafkaConsumer<String, Number> consumer,
+ final Map<String, Set<Integer>> inputs,
+ final int maxRecordsPerKey,
+ final boolean eosEnabled) {
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+ final Map<String, Map<String, LinkedList<ConsumerRecord<String,
Number>>>> events = new HashMap<>();
+ VerificationResult verificationResult = new VerificationResult(false,
"no results yet");
+ final long start = System.currentTimeMillis();
int recordsProcessed = 0;
+
+ final List<TopicPartition> partitions = getAllPartitions(consumer,
NUMERIC_VALUE_TOPICS);
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
final Map<String, AtomicInteger> processed =
Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new
AtomicInteger(0)));
- final Map<String, Map<String, LinkedList<ConsumerRecord<String,
Number>>>> events = new HashMap<>();
-
- VerificationResult verificationResult = new VerificationResult(false,
"no results yet");
int retry = 0;
- final long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <
TimeUnit.MINUTES.toMillis(6)) {
final ConsumerRecords<String, Number> records =
consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
@@ -436,23 +458,32 @@ public class SmokeTestDriver extends SmokeTestUtil {
System.out.println(processed);
}
}
- consumer.close();
- final long finished = System.currentTimeMillis() - start;
+ return new PollResult(events, recordsProcessed, verificationResult);
+ }
+
+ private static VerificationResult reportAndFinalize(
+ final Map<String, Set<Integer>> inputs,
+ final int maxRecordsPerKey,
+ final long startTime,
+ final boolean eosEnabled,
+ final PollResult pollResult) {
+ final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+ final long finished = System.currentTimeMillis() - startTime;
System.out.println("Verification time=" + finished);
System.out.println("-------------------");
System.out.println("Result Verification");
System.out.println("-------------------");
System.out.println("recordGenerated=" + recordsGenerated);
- System.out.println("recordProcessed=" + recordsProcessed);
+ System.out.println("recordProcessed=" + pollResult.recordsProcessed);
- if (recordsProcessed > recordsGenerated) {
+ if (pollResult.recordsProcessed > recordsGenerated) {
System.out.println("PROCESSED-MORE-THAN-GENERATED");
- } else if (recordsProcessed < recordsGenerated) {
+ } else if (pollResult.recordsProcessed < recordsGenerated) {
System.out.println("PROCESSED-LESS-THAN-GENERATED");
}
- final Map<String, Set<Number>> received =
parseRecordsForEchoTopic(events);
+ final Map<String, Set<Number>> received =
parseRecordsForEchoTopic(pollResult.events);
boolean success = inputs.equals(received);
@@ -466,9 +497,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
System.out.println("missedRecords=" + missedCount);
}
+ VerificationResult verificationResult = pollResult.verificationResult;
// give it one more try if it's not already passing.
if (!verificationResult.passed()) {
- verificationResult = verifyAll(inputs, events, true, eosEnabled);
+ verificationResult = verifyAll(inputs, pollResult.events, true,
eosEnabled);
}
success &= verificationResult.passed();
@@ -478,6 +510,29 @@ public class SmokeTestDriver extends SmokeTestUtil {
return verificationResult;
}
+ public static VerificationResult verify(final String kafka,
+ final Map<String, Set<Integer>>
inputs,
+ final int maxRecordsPerKey,
+ final boolean eosEnabled) {
+ final VerificationResult txnResult = preVerifyTransactions(kafka,
eosEnabled);
+ if (!txnResult.passed()) {
+ return txnResult;
+ }
+
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NumberDeserializer.class);
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+ final long start = System.currentTimeMillis();
+ try (final KafkaConsumer<String, Number> consumer = new
KafkaConsumer<>(props)) {
+ final PollResult pollResult = pollAndCollect(consumer, inputs,
maxRecordsPerKey, eosEnabled);
+ return reportAndFinalize(inputs, maxRecordsPerKey, start,
eosEnabled, pollResult);
+ }
+ }
+
private static Map<String, Set<Number>> parseRecordsForEchoTopic(
final Map<String, Map<String, LinkedList<ConsumerRecord<String,
Number>>>> events) {
return events.containsKey("echo") ?
@@ -491,24 +546,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)) : Collections.emptyMap();
}
- public static class VerificationResult {
- private final boolean passed;
- private final String result;
-
- VerificationResult(final boolean passed, final String result) {
- this.passed = passed;
- this.result = result;
- }
-
- public boolean passed() {
- return passed;
- }
-
- public String result() {
- return result;
- }
- }
-
private static VerificationResult verifyAll(final Map<String,
Set<Integer>> inputs,
final Map<String, Map<String,
LinkedList<ConsumerRecord<String, Number>>>> events,
final boolean printResults,
@@ -732,4 +769,65 @@ public class SmokeTestDriver extends SmokeTestUtil {
return partitions;
}
+ private static Properties createConsumerPropsWithByteDeserializer(final
String kafka, final String clientId) {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
+ return props;
+ }
+
+ private static VerificationResult verifyAllTransactionFinished(final
String kafka) {
+ final Properties txnProps =
createConsumerPropsWithByteDeserializer(kafka, "verifier");
+ txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString());
+ try (final KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(txnProps)) {
+ // Get all output topics except "data" (which is the input topic)
+ final String[] outputTopics = Arrays.stream(NUMERIC_VALUE_TOPICS)
+ .filter(topic -> !topic.equals("data"))
+ .toArray(String[]::new);
+
+ final List<TopicPartition> partitions = getAllPartitions(consumer,
outputTopics);
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+ for (final TopicPartition tp : partitions) {
+ System.out.println(tp + " at position " +
consumer.position(tp));
+ }
+ final Properties consumerProps =
createConsumerPropsWithByteDeserializer(kafka, "consumer-uncommitted");
+
+ final long maxWaitTime = System.currentTimeMillis() +
MAX_IDLE_TIME_MS;
+ try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new
KafkaConsumer<>(consumerProps)) {
+ while (!partitions.isEmpty() && System.currentTimeMillis() <
maxWaitTime) {
+ consumer.seekToEnd(partitions);
+ final Map<TopicPartition, Long> topicEndOffsets =
consumerUncommitted.endOffsets(partitions);
+
+ final java.util.Iterator<TopicPartition> iterator =
partitions.iterator();
+ while (iterator.hasNext()) {
+ final TopicPartition topicPartition = iterator.next();
+ final long position =
consumer.position(topicPartition);
+
+ if (position == topicEndOffsets.get(topicPartition)) {
+ iterator.remove();
+ System.out.println("Removing " + topicPartition +
" at position " + position);
+ } else if (position >
topicEndOffsets.get(topicPartition)) {
+ return new VerificationResult(false, "Offset for
partition " + topicPartition + " is larger than topic endOffset: " + position +
" > " + topicEndOffsets.get(topicPartition));
+ } else {
+ System.out.println("Retry " + topicPartition + "
at position " + position);
+ }
+ }
+ sleep(1000L);
+ }
+ }
+
+ if (!partitions.isEmpty()) {
+ return new VerificationResult(false, "Could not read all
verification records. Did not receive any new record within the last " +
(MAX_IDLE_TIME_MS / 1000L) + " sec.");
+ }
+ return new VerificationResult(true, "All transactions finished
successfully");
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ System.out.println("FAILED");
+ return new VerificationResult(false, "Transaction verification
failed: " + e.getMessage());
+ }
+ }
+
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index d0ad6c8cabb..a195980abb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -134,4 +134,22 @@ public class SmokeTestUtil {
} catch (final Exception ignore) { }
}
+ public static class VerificationResult {
+ private final boolean passed;
+ private final String result;
+
+ public VerificationResult(final boolean passed, final String result) {
+ this.passed = passed;
+ this.result = result;
+ }
+
+ public boolean passed() {
+ return passed;
+ }
+
+ public String result() {
+ return result;
+ }
+ }
+
}