This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch 3.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 303c86f7a5ca2736138b76476928abb22869dfe9
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Mon Oct 23 10:33:59 2023 +0200

    KAFKA-15664: Add 3.4 Streams upgrade system tests (#14601)
    
    Reviewers: Luke Chen <show...@gmail.com>,  Matthias J. Sax 
<mj...@apache.org>
---
 build.gradle                                       |  14 +
 gradle/dependencies.gradle                         |   2 +
 settings.gradle                                    |   1 +
 .../kafka/streams/tests/SmokeTestClient.java       | 299 +++++++++
 .../kafka/streams/tests/SmokeTestDriver.java       | 670 +++++++++++++++++++++
 .../apache/kafka/streams/tests/SmokeTestUtil.java  | 131 ++++
 .../kafka/streams/tests/StreamsSmokeTest.java      | 100 +++
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 120 ++++
 .../streams/streams_application_upgrade_test.py    |   4 +-
 .../streams/streams_broker_compatibility_test.py   |   5 +-
 .../tests/streams/streams_upgrade_test.py          |   9 +-
 11 files changed, 1347 insertions(+), 8 deletions(-)

diff --git a/build.gradle b/build.gradle
index 15019c112fd..a2df7a64a1e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2121,6 +2121,7 @@ project(':streams') {
             ':streams:upgrade-system-tests-31:test',
             ':streams:upgrade-system-tests-32:test',
             ':streams:upgrade-system-tests-33:test',
+            ':streams:upgrade-system-tests-34:test',
             ':streams:examples:test'
     ]
   )
@@ -2506,6 +2507,19 @@ project(':streams:upgrade-system-tests-33') {
   }
 }
 
+project(':streams:upgrade-system-tests-34') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-34"
+
+  dependencies {
+    testImplementation libs.kafkaStreams_34
+    testRuntimeOnly libs.junitJupiter
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
 project(':jmh-benchmarks') {
 
   apply plugin: 'com.github.johnrengelman.shadow'
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 11a15ec05aa..4f65cb4ff23 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -105,6 +105,7 @@ versions += [
   kafka_31: "3.1.2",
   kafka_32: "3.2.3",
   kafka_33: "3.3.1",
+  kafka_34: "3.4.1",
   lz4: "1.8.0",
   mavenArtifact: "3.8.8",
   metrics: "2.2.0",
@@ -192,6 +193,7 @@ libs += [
   kafkaStreams_31: "org.apache.kafka:kafka-streams:$versions.kafka_31",
   kafkaStreams_32: "org.apache.kafka:kafka-streams:$versions.kafka_32",
   kafkaStreams_33: "org.apache.kafka:kafka-streams:$versions.kafka_33",
+  kafkaStreams_34: "org.apache.kafka:kafka-streams:$versions.kafka_34",
   log4j: "ch.qos.reload4j:reload4j:$versions.reload4j",
   lz4: "org.lz4:lz4-java:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
diff --git a/settings.gradle b/settings.gradle
index 871c57a887c..3d641664679 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -57,6 +57,7 @@ include 'clients',
     'streams:upgrade-system-tests-31',
     'streams:upgrade-system-tests-32',
     'streams:upgrade-system-tests-33',
+    'streams:upgrade-system-tests-34',
     'tools',
     'tools:tools-api',
     'trogdor'
diff --git 
a/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
new file mode 100644
index 00000000000..dc0ad4d5601
--- /dev/null
+++ 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {
+
+    private final String name;
+
+    private KafkaStreams streams;
+    private boolean uncaughtException = false;
+    private boolean started;
+    private volatile boolean closed;
+
+    private static void addShutdownHook(final String name, final Runnable 
runnable) {
+        if (name != null) {
+            Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, 
runnable));
+        } else {
+            Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+        }
+    }
+
+    private static File tempDirectory() {
+        final String prefix = "kafka-";
+        final File file;
+        try {
+            file = Files.createTempDirectory(prefix).toFile();
+        } catch (final IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+        file.deleteOnExit();
+
+        addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+            try {
+                Utils.delete(file);
+            } catch (final IOException e) {
+                System.out.println("Error deleting " + file.getAbsolutePath());
+                e.printStackTrace(System.out);
+            }
+        });
+
+        return file;
+    }
+
+    public SmokeTestClient(final String name) {
+        this.name = name;
+    }
+
+    public boolean started() {
+        return started;
+    }
+
+    public boolean closed() {
+        return closed;
+    }
+
+    public void start(final Properties streamsProperties) {
+        final Topology build = getTopology();
+        streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        streams.setStateListener((newState, oldState) -> {
+            System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), 
oldState, newState);
+            if (oldState == KafkaStreams.State.REBALANCING && newState == 
KafkaStreams.State.RUNNING) {
+                started = true;
+                countDownLatch.countDown();
+            }
+
+            if (newState == KafkaStreams.State.NOT_RUNNING) {
+                closed = true;
+            }
+        });
+
+        streams.setUncaughtExceptionHandler(e -> {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+            System.out.println(name + ": FATAL: An unexpected exception is 
encountered: " + e);
+            e.printStackTrace(System.out);
+            uncaughtException = true;
+            return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+        });
+
+        addShutdownHook("streams-shutdown-hook", this::close);
+
+        streams.start();
+        try {
+            if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+                System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: 
Didn't start in one minute");
+            }
+        } catch (final InterruptedException e) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+            e.printStackTrace(System.out);
+        }
+        System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+        System.out.println(name + " started at " + Instant.now());
+    }
+
+    public void closeAsync() {
+        streams.close(Duration.ZERO);
+    }
+
+    public void close() {
+        final boolean closed = streams.close(Duration.ofMinutes(1));
+
+        if (closed && !uncaughtException) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
+        } else if (closed) {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+        } else {
+            System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't 
close");
+        }
+    }
+
+    private Properties getStreamsConfig(final Properties props) {
+        final Properties fullProps = new Properties(props);
+        fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
+        fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
+        fullProps.put(StreamsConfig.STATE_DIR_CONFIG, 
tempDirectory().getAbsolutePath());
+        fullProps.putAll(props);
+        return fullProps;
+    }
+
+    public Topology getTopology() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Consumed<String, Integer> stringIntConsumed = 
Consumed.with(stringSerde, intSerde);
+        final KStream<String, Integer> source = builder.stream("data", 
stringIntConsumed);
+        source.filterNot((k, v) -> k.equals("flush"))
+              .to("echo", Produced.with(stringSerde, intSerde));
+        final KStream<String, Integer> data = source.filter((key, value) -> 
value == null || value != END);
+        data.process(SmokeTestUtil.printProcessorSupplier("data", name));
+
+        // min
+        final KGroupedStream<String, Integer> groupedData = 
data.groupByKey(Grouped.with(stringSerde, intSerde));
+
+        final KTable<Windowed<String>, Integer> minAggregation = groupedData
+            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), 
Duration.ofMinutes(1)))
+            .aggregate(
+                () -> Integer.MAX_VALUE,
+                (aggKey, value, aggregate) -> (value < aggregate) ? value : 
aggregate,
+                Materialized
+                    .<String, Integer, WindowStore<Bytes, 
byte[]>>as("uwin-min")
+                    .withValueSerde(intSerde)
+                    .withRetention(Duration.ofHours(25))
+            );
+
+        streamify(minAggregation, "min-raw");
+
+        
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), 
"min-suppressed");
+
+        minAggregation
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
+            .to("min", Produced.with(stringSerde, intSerde));
+
+        final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
+            .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), 
Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
+            .reduce(Integer::sum);
+
+        streamify(smallWindowSum, "sws-raw");
+        
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), 
"sws-suppressed");
+
+        final KTable<String, Integer> minTable = builder.table(
+            "min",
+            Consumed.with(stringSerde, intSerde),
+            Materialized.as("minStoreName"));
+
+        
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name));
+
+        // max
+        groupedData
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
+            .aggregate(
+                () -> Integer.MIN_VALUE,
+                (aggKey, value, aggregate) -> (value > aggregate) ? value : 
aggregate,
+                Materialized.<String, Integer, WindowStore<Bytes, 
byte[]>>as("uwin-max").withValueSerde(intSerde))
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
+            .to("max", Produced.with(stringSerde, intSerde));
+
+        final KTable<String, Integer> maxTable = builder.table(
+            "max",
+            Consumed.with(stringSerde, intSerde),
+            Materialized.as("maxStoreName"));
+        
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name));
+
+        // sum
+        groupedData
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
+            .aggregate(
+                () -> 0L,
+                (aggKey, value, aggregate) -> (long) value + aggregate,
+                Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("win-sum").withValueSerde(longSerde))
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
+            .to("sum", Produced.with(stringSerde, longSerde));
+
+        final Consumed<String, Long> stringLongConsumed = 
Consumed.with(stringSerde, longSerde);
+        final KTable<String, Long> sumTable = builder.table("sum", 
stringLongConsumed);
+        
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name));
+
+        // cnt
+        groupedData
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
+            .count(Materialized.as("uwin-cnt"))
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
+            .to("cnt", Produced.with(stringSerde, longSerde));
+
+        final KTable<String, Long> cntTable = builder.table(
+            "cnt",
+            Consumed.with(stringSerde, longSerde),
+            Materialized.as("cntStoreName"));
+        
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name));
+
+        // dif
+        maxTable
+            .join(
+                minTable,
+                (value1, value2) -> value1 - value2)
+            .toStream()
+            .filterNot((k, v) -> k.equals("flush"))
+            .to("dif", Produced.with(stringSerde, intSerde));
+
+        // avg
+        sumTable
+            .join(
+                cntTable,
+                (value1, value2) -> (double) value1 / (double) value2)
+            .toStream()
+            .filterNot((k, v) -> k.equals("flush"))
+            .to("avg", Produced.with(stringSerde, doubleSerde));
+
+        // test repartition
+        final Agg agg = new Agg();
+        cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
+                .aggregate(agg.init(), agg.adder(), agg.remover(),
+                           Materialized.<String, 
Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
+                               .withKeySerde(Serdes.String())
+                               .withValueSerde(Serdes.Long()))
+                .toStream()
+                .to("tagg", Produced.with(stringSerde, longSerde));
+
+        return builder.build();
+    }
+
+    private static void streamify(final KTable<Windowed<String>, Integer> 
windowedTable, final String topic) {
+        windowedTable
+            .toStream()
+            .filterNot((k, v) -> k.key().equals("flush"))
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to(topic, Produced.with(stringSerde, intSerde));
+    }
+}
diff --git 
a/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
new file mode 100644
index 00000000000..dbacbb9625b
--- /dev/null
+++ 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+    private static final String[] NUMERIC_VALUE_TOPICS = {
+        "data",
+        "echo",
+        "max",
+        "min", "min-suppressed", "min-raw",
+        "dif",
+        "sum",
+        "sws-raw", "sws-suppressed",
+        "cnt",
+        "avg",
+        "tagg"
+    };
+    private static final String[] STRING_VALUE_TOPICS = {
+        "fk"
+    };
+
+    private static final String[] TOPICS = new 
String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+    static {
+        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, 
NUMERIC_VALUE_TOPICS.length);
+        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, 
NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+    }
+
+    private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+
+    private static class ValueList {
+        public final String key;
+        private final int[] values;
+        private int index;
+
+        ValueList(final int min, final int max) {
+            key = min + "-" + max;
+
+            values = new int[max - min + 1];
+            for (int i = 0; i < values.length; i++) {
+                values[i] = min + i;
+            }
+            // We want to randomize the order of data to test not completely 
predictable processing order
+            // However, values are also use as a timestamp of the record. 
(TODO: separate data and timestamp)
+            // We keep some correlation of time and order. Thus, the shuffling 
is done with a sliding window
+            shuffle(values, 10);
+
+            index = 0;
+        }
+
+        int next() {
+            return (index < values.length) ? values[index++] : -1;
+        }
+    }
+
+    public static String[] topics() {
+        return Arrays.copyOf(TOPICS, TOPICS.length);
+    }
+
+    static void generatePerpetually(final String kafka,
+                                    final int numKeys,
+                                    final int maxRecordsPerKey) {
+        final Properties producerProps = generatorProperties(kafka);
+
+        int numRecordsProduced = 0;
+
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+        }
+
+        final Random rand = new Random();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps)) {
+            while (true) {
+                final int index = rand.nextInt(numKeys);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                final ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(
+                        "data",
+                        stringSerde.serializer().serialize("", key),
+                        intSerde.serializer().serialize("", value)
+                    );
+                producer.send(record);
+
+                final ProducerRecord<byte[], byte[]> fkRecord =
+                    new ProducerRecord<>(
+                        "fk",
+                        intSerde.serializer().serialize("", value),
+                        stringSerde.serializer().serialize("", key)
+                    );
+                producer.send(fkRecord);
+
+                numRecordsProduced++;
+                if (numRecordsProduced % 100 == 0) {
+                    System.out.println(Instant.now() + " " + 
numRecordsProduced + " records produced");
+                }
+                Utils.sleep(2);
+            }
+        }
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int 
maxRecordsPerKey,
+                                                     final Duration 
timeToSpend) {
+        final Properties producerProps = generatorProperties(kafka);
+
+        int numRecordsProduced = 0;
+
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+            allData.put(data[i].key, new HashSet<>());
+        }
+        final Random rand = new Random();
+
+        int remaining = data.length;
+
+        final long recordPauseTime = timeToSpend.toMillis() / numKeys / 
maxRecordsPerKey;
+
+        final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new 
ArrayList<>();
+        final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new 
ArrayList<>();
+
+        try (final KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps)) {
+            while (remaining > 0) {
+                final int index = rand.nextInt(remaining);
+                final String key = data[index].key;
+                final int value = data[index].next();
+
+                if (value < 0) {
+                    remaining--;
+                    data[index] = data[remaining];
+                } else {
+                    final ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(
+                            "data",
+                            stringSerde.serializer().serialize("", key),
+                            intSerde.serializer().serialize("", value)
+                        );
+
+                    producer.send(record, new TestCallback(record, 
dataNeedRetry));
+
+                    final ProducerRecord<byte[], byte[]> fkRecord =
+                        new ProducerRecord<>(
+                            "fk",
+                            intSerde.serializer().serialize("", value),
+                            stringSerde.serializer().serialize("", key)
+                        );
+
+                    producer.send(fkRecord, new TestCallback(fkRecord, 
fkNeedRetry));
+
+                    numRecordsProduced++;
+                    allData.get(key).add(value);
+                    if (numRecordsProduced % 100 == 0) {
+                        System.out.println(Instant.now() + " " + 
numRecordsProduced + " records produced");
+                    }
+                    Utils.sleep(Math.max(recordPauseTime, 2));
+                }
+            }
+            producer.flush();
+
+            retry(producer, dataNeedRetry, stringSerde);
+            retry(producer, fkNeedRetry, intSerde);
+
+            flush(producer,
+                "data",
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            );
+            flush(producer,
+                "fk",
+                intSerde.serializer().serialize("", 0),
+                stringSerde.serializer().serialize("", "flush")
+            );
+        }
+        return Collections.unmodifiableMap(allData);
+    }
+
+    private static void retry(final KafkaProducer<byte[], byte[]> producer,
+        List<ProducerRecord<byte[], byte[]>> needRetry,
+        final Serde<?> keySerde) {
+        int remainingRetries = 5;
+        while (!needRetry.isEmpty()) {
+            final List<ProducerRecord<byte[], byte[]>> needRetry2 = new 
ArrayList<>();
+            for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+                System.out.println(
+                    "retry producing " + 
keySerde.deserializer().deserialize("", record.key()));
+                producer.send(record, new TestCallback(record, needRetry2));
+            }
+            producer.flush();
+            needRetry = needRetry2;
+            if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+                System.err.println("Failed to produce all records after 
multiple retries");
+                Exit.exit(1);
+            }
+        }
+    }
+
+    private static void flush(final KafkaProducer<byte[], byte[]> producer,
+        final String topic,
+        final byte[] keyBytes,
+        final byte[] valBytes) {
+        // now that we've sent everything, we'll send some final records with 
a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                keyBytes,
+                valBytes
+            ));
+        }
+    }
+
+    private static Properties generatorProperties(final String kafka) {
+        final Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class);
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        return producerProps;
+    }
+
+    private static class TestCallback implements Callback {
+        private final ProducerRecord<byte[], byte[]> originalRecord;
+        private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+        TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+                     final List<ProducerRecord<byte[], byte[]>> needRetry) {
+            this.originalRecord = originalRecord;
+            this.needRetry = needRetry;
+        }
+
+        @Override
+        public void onCompletion(final RecordMetadata metadata, final 
Exception exception) {
+            if (exception != null) {
+                if (exception instanceof TimeoutException) {
+                    needRetry.add(originalRecord);
+                } else {
+                    exception.printStackTrace();
+                    Exit.exit(1);
+                }
+            }
+        }
+    }
+
+    private static void shuffle(final int[] data, 
@SuppressWarnings("SameParameterValue") final int windowSize) {
+        final Random rand = new Random();
+        for (int i = 0; i < data.length; i++) {
+            // we shuffle data within windowSize
+            final int j = rand.nextInt(Math.min(data.length - i, windowSize)) 
+ i;
+
+            // swap
+            final int tmp = data[i];
+            data[i] = data[j];
+            data[j] = tmp;
+        }
+    }
+
+    public static class NumberDeserializer implements Deserializer<Number> {
+        @Override
+        public Number deserialize(final String topic, final byte[] data) {
+            final Number value;
+            switch (topic) {
+                case "data":
+                case "echo":
+                case "min":
+                case "min-raw":
+                case "min-suppressed":
+                case "sws-raw":
+                case "sws-suppressed":
+                case "max":
+                case "dif":
+                    value = intSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "sum":
+                case "cnt":
+                case "tagg":
+                    value = longSerde.deserializer().deserialize(topic, data);
+                    break;
+                case "avg":
+                    value = doubleSerde.deserializer().deserialize(topic, 
data);
+                    break;
+                default:
+                    throw new RuntimeException("unknown topic: " + topic);
+            }
+            return value;
+        }
+    }
+
+    public static VerificationResult verify(final String kafka,
+                                            final Map<String, Set<Integer>> 
inputs,
+                                            final int maxRecordsPerKey) {
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
NumberDeserializer.class);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+        final KafkaConsumer<String, Number> consumer = new 
KafkaConsumer<>(props);
+        final List<TopicPartition> partitions = getAllPartitions(consumer, 
NUMERIC_VALUE_TOPICS);
+        consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+        int recordsProcessed = 0;
+        final Map<String, AtomicInteger> processed =
+            Stream.of(NUMERIC_VALUE_TOPICS)
+                  .collect(Collectors.toMap(t -> t, t -> new 
AtomicInteger(0)));
+
+        final Map<String, Map<String, LinkedList<ConsumerRecord<String, 
Number>>>> events = new HashMap<>();
+
+        VerificationResult verificationResult = new VerificationResult(false, 
"no results yet");
+        int retry = 0;
+        final long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() - start < 
TimeUnit.MINUTES.toMillis(6)) {
+            final ConsumerRecords<String, Number> records = 
consumer.poll(Duration.ofSeconds(5));
+            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+                verificationResult = verifyAll(inputs, events, false);
+                if (verificationResult.passed()) {
+                    break;
+                } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+                    System.out.println(Instant.now() + " Didn't get any more 
results, verification hasn't passed, and out of retries.");
+                    break;
+                } else {
+                    System.out.println(Instant.now() + " Didn't get any more 
results, but verification hasn't passed (yet). Retrying..." + retry);
+                }
+            } else {
+                System.out.println(Instant.now() + " Get some more results 
from " + records.partitions() + ", resetting retry.");
+
+                retry = 0;
+                for (final ConsumerRecord<String, Number> record : records) {
+                    final String key = record.key();
+
+                    final String topic = record.topic();
+                    processed.get(topic).incrementAndGet();
+
+                    if (topic.equals("echo")) {
+                        recordsProcessed++;
+                        if (recordsProcessed % 100 == 0) {
+                            System.out.println("Echo records processed = " + 
recordsProcessed);
+                        }
+                    }
+
+                    events.computeIfAbsent(topic, t -> new HashMap<>())
+                          .computeIfAbsent(key, k -> new LinkedList<>())
+                          .add(record);
+                }
+
+                System.out.println(processed);
+            }
+        }
+        consumer.close();
+        final long finished = System.currentTimeMillis() - start;
+        System.out.println("Verification time=" + finished);
+        System.out.println("-------------------");
+        System.out.println("Result Verification");
+        System.out.println("-------------------");
+        System.out.println("recordGenerated=" + recordsGenerated);
+        System.out.println("recordProcessed=" + recordsProcessed);
+
+        if (recordsProcessed > recordsGenerated) {
+            System.out.println("PROCESSED-MORE-THAN-GENERATED");
+        } else if (recordsProcessed < recordsGenerated) {
+            System.out.println("PROCESSED-LESS-THAN-GENERATED");
+        }
+
+        boolean success;
+
+        final Map<String, Set<Number>> received =
+            events.get("echo")
+                  .entrySet()
+                  .stream()
+                  .map(entry -> mkEntry(
+                      entry.getKey(),
+                      
entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+                  )
+                  .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        success = inputs.equals(received);
+
+        if (success) {
+            System.out.println("ALL-RECORDS-DELIVERED");
+        } else {
+            int missedCount = 0;
+            for (final Map.Entry<String, Set<Integer>> entry : 
inputs.entrySet()) {
+                missedCount += received.get(entry.getKey()).size();
+            }
+            System.out.println("missedRecords=" + missedCount);
+        }
+
+        // give it one more try if it's not already passing.
+        if (!verificationResult.passed()) {
+            verificationResult = verifyAll(inputs, events, true);
+        }
+        success &= verificationResult.passed();
+
+        System.out.println(verificationResult.result());
+
+        System.out.println(success ? "SUCCESS" : "FAILURE");
+        return verificationResult;
+    }
+
+    public static class VerificationResult {
+        private final boolean passed;
+        private final String result;
+
+        VerificationResult(final boolean passed, final String result) {
+            this.passed = passed;
+            this.result = result;
+        }
+
+        public boolean passed() {
+            return passed;
+        }
+
+        public String result() {
+            return result;
+        }
+    }
+
+    private static VerificationResult verifyAll(final Map<String, 
Set<Integer>> inputs,
+                                                final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events,
+                                                final boolean printResults) {
+        final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        boolean pass;
+        try (final PrintStream resultStream = new 
PrintStream(byteArrayOutputStream)) {
+            pass = verifyTAgg(resultStream, inputs, events.get("tagg"), 
printResults);
+            pass &= verifySuppressed(resultStream, "min-suppressed", events, 
printResults);
+            pass &= verify(resultStream, "min-suppressed", inputs, events, 
windowedKey -> {
+                final String unwindowedKey = windowedKey.substring(1, 
windowedKey.length() - 1).replaceAll("@.*", "");
+                return getMin(unwindowedKey);
+            }, printResults);
+            pass &= verifySuppressed(resultStream, "sws-suppressed", events, 
printResults);
+            pass &= verify(resultStream, "min", inputs, events, 
SmokeTestDriver::getMin, printResults);
+            pass &= verify(resultStream, "max", inputs, events, 
SmokeTestDriver::getMax, printResults);
+            pass &= verify(resultStream, "dif", inputs, events, key -> 
getMax(key).intValue() - getMin(key).intValue(), printResults);
+            pass &= verify(resultStream, "sum", inputs, events, 
SmokeTestDriver::getSum, printResults);
+            pass &= verify(resultStream, "cnt", inputs, events, key1 -> 
getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
+            pass &= verify(resultStream, "avg", inputs, events, 
SmokeTestDriver::getAvg, printResults);
+        }
+        return new VerificationResult(pass, new 
String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
+    }
+
+    private static boolean verify(final PrintStream resultStream,
+                                  final String topic,
+                                  final Map<String, Set<Integer>> inputData,
+                                  final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events,
+                                  final Function<String, Number> 
keyToExpectation,
+                                  final boolean printResults) {
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> 
observedInputEvents = events.get("data");
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> 
outputEvents = events.getOrDefault(topic, emptyMap());
+        if (outputEvents.isEmpty()) {
+            resultStream.println(topic + " is empty");
+            return false;
+        } else {
+            resultStream.printf("verifying %s with %d keys%n", topic, 
outputEvents.size());
+
+            if (outputEvents.size() != inputData.size()) {
+                resultStream.printf("fail: resultCount=%d 
expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+                                    outputEvents.size(), inputData.size(), 
outputEvents.keySet(), inputData.keySet());
+                return false;
+            }
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, 
Number>>> entry : outputEvents.entrySet()) {
+                final String key = entry.getKey();
+                final Number expected = keyToExpectation.apply(key);
+                final Number actual = entry.getValue().getLast().value();
+                if (!expected.equals(actual)) {
+                    resultStream.printf("%s fail: key=%s actual=%s 
expected=%s%n", topic, key, actual, expected);
+
+                    if (printResults) {
+                        resultStream.printf("\t inputEvents=%n%s%n\t" +
+                                
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
+                            indent("\t\t", observedInputEvents.get(key)),
+                            indent("\t\t", events.getOrDefault("echo", 
emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("max", 
emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("min", 
emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("dif", 
emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("cnt", 
emptyMap()).getOrDefault(key, new LinkedList<>())),
+                            indent("\t\t", events.getOrDefault("tagg", 
emptyMap()).getOrDefault(key, new LinkedList<>())));
+
+                        if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", 
"tagg").contains(topic))
+                            resultStream.printf("%sEvents=%n%s%n", topic, 
indent("\t\t", entry.getValue()));
+                    }
+
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+
+    private static boolean verifySuppressed(final PrintStream resultStream,
+                                            
@SuppressWarnings("SameParameterValue") final String topic,
+                                            final Map<String, Map<String, 
LinkedList<ConsumerRecord<String, Number>>>> events,
+                                            final boolean printResults) {
+        resultStream.println("verifying suppressed " + topic);
+        final Map<String, LinkedList<ConsumerRecord<String, Number>>> 
topicEvents = events.getOrDefault(topic, emptyMap());
+        for (final Map.Entry<String, LinkedList<ConsumerRecord<String, 
Number>>> entry : topicEvents.entrySet()) {
+            if (entry.getValue().size() != 1) {
+                final String unsuppressedTopic = topic.replace("-suppressed", 
"-raw");
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 
1).replaceAll("@.*", "");
+                resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
+                                    key,
+                                    indent("\t\t", entry.getValue()));
+
+                if (printResults)
+                    
resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
+                        indent("\t\t", events.get(unsuppressedTopic).get(key)),
+                        indent("\t\t", events.get("data").get(unwindowedKey)));
+
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static String indent(@SuppressWarnings("SameParameterValue") final 
String prefix,
+                                 final Iterable<ConsumerRecord<String, 
Number>> list) {
+        final StringBuilder stringBuilder = new StringBuilder();
+        for (final ConsumerRecord<String, Number> record : list) {
+            stringBuilder.append(prefix).append(record).append('\n');
+        }
+        return stringBuilder.toString();
+    }
+
+    private static Long getSum(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) * (max - min + 1L) / 2L;
+    }
+
+    private static Double getAvg(final String key) {
+        final int min = getMin(key).intValue();
+        final int max = getMax(key).intValue();
+        return ((long) min + max) / 2.0;
+    }
+
+
+    private static boolean verifyTAgg(final PrintStream resultStream,
+                                      final Map<String, Set<Integer>> allData,
+                                      final Map<String, 
LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+                                      final boolean printResults) {
+        if (taggEvents == null) {
+            resultStream.println("tagg is missing");
+            return false;
+        } else if (taggEvents.isEmpty()) {
+            resultStream.println("tagg is empty");
+            return false;
+        } else {
+            resultStream.println("verifying tagg");
+
+            // generate expected answer
+            final Map<String, Long> expected = new HashMap<>();
+            for (final String key : allData.keySet()) {
+                final int min = getMin(key).intValue();
+                final int max = getMax(key).intValue();
+                final String cnt = Long.toString(max - min + 1L);
+
+                expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
+            }
+
+            // check the result
+            for (final Map.Entry<String, LinkedList<ConsumerRecord<String, 
Number>>> entry : taggEvents.entrySet()) {
+                final String key = entry.getKey();
+                Long expectedCount = expected.remove(key);
+                if (expectedCount == null) {
+                    expectedCount = 0L;
+                }
+
+                if (entry.getValue().getLast().value().longValue() != 
expectedCount) {
+                    resultStream.println("fail: key=" + key + " tagg=" + 
entry.getValue() + " expected=" + expectedCount);
+
+                    if (printResults)
+                        resultStream.println("\t taggEvents: " + 
entry.getValue());
+                    return false;
+                }
+            }
+
+        }
+        return true;
+    }
+
+    private static Number getMin(final String key) {
+        return Integer.parseInt(key.split("-")[0]);
+    }
+
+    private static Number getMax(final String key) {
+        return Integer.parseInt(key.split("-")[1]);
+    }
+
+    private static List<TopicPartition> getAllPartitions(final 
KafkaConsumer<?, ?> consumer, final String... topics) {
+        final List<TopicPartition> partitions = new ArrayList<>();
+
+        for (final String topic : topics) {
+            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+                partitions.add(new TopicPartition(info.topic(), 
info.partition()));
+            }
+        }
+        return partitions;
+    }
+
+}
diff --git 
a/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
new file mode 100644
index 00000000000..f4b43a451ac
--- /dev/null
+++ 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {
+
+    final static int END = Integer.MAX_VALUE;
+
+    static ProcessorSupplier<Object, Object, Void, Void> 
printProcessorSupplier(final String topic) {
+        return printProcessorSupplier(topic, "");
+    }
+
+    static ProcessorSupplier<Object, Object, Void, Void> 
printProcessorSupplier(final String topic, final String name) {
+        return () -> new ContextualProcessor<Object, Object, Void, Void>() {
+            private int numRecordsProcessed = 0;
+            private long smallestOffset = Long.MAX_VALUE;
+            private long largestOffset = Long.MIN_VALUE;
+
+            @Override
+            public void init(final ProcessorContext<Void, Void> context) {
+                super.init(context);
+                System.out.println("[3.4] initializing processor: topic=" + 
topic + " taskId=" + context.taskId());
+                System.out.flush();
+                numRecordsProcessed = 0;
+                smallestOffset = Long.MAX_VALUE;
+                largestOffset = Long.MIN_VALUE;
+            }
+
+            @Override
+            public void process(final Record<Object, Object> record) {
+                numRecordsProcessed++;
+                if (numRecordsProcessed % 100 == 0) {
+                    System.out.printf("%s: %s%n", name, Instant.now());
+                    System.out.println("processed " + numRecordsProcessed + " 
records from topic=" + topic);
+                }
+
+                context().recordMetadata().ifPresent(recordMetadata -> {
+                    if (smallestOffset > recordMetadata.offset()) {
+                        smallestOffset = recordMetadata.offset();
+                    }
+                    if (largestOffset < recordMetadata.offset()) {
+                        largestOffset = recordMetadata.offset();
+                    }
+                });
+            }
+
+            @Override
+            public void close() {
+                System.out.printf("Close processor for task %s%n", 
context().taskId());
+                System.out.println("processed " + numRecordsProcessed + " 
records");
+                final long processed;
+                if (largestOffset >= smallestOffset) {
+                    processed = 1L + largestOffset - smallestOffset;
+                } else {
+                    processed = 0L;
+                }
+                System.out.println("offset " + smallestOffset + " to " + 
largestOffset + " -> processed " + processed);
+                System.out.flush();
+            }
+        };
+    }
+
+    public static final class Unwindow<K, V> implements 
KeyValueMapper<Windowed<K>, V, K> {
+        @Override
+        public K apply(final Windowed<K> winKey, final V value) {
+            return winKey.key();
+        }
+    }
+
+    public static class Agg {
+
+        KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
+            return (key, value) -> new KeyValue<>(value == null ? null : 
Long.toString(value), 1L);
+        }
+
+        public Initializer<Long> init() {
+            return () -> 0L;
+        }
+
+        Aggregator<String, Long, Long> adder() {
+            return (aggKey, value, aggregate) -> aggregate + value;
+        }
+
+        Aggregator<String, Long, Long> remover() {
+            return (aggKey, value, aggregate) -> aggregate - value;
+        }
+    }
+
+    public static Serde<String> stringSerde = Serdes.String();
+
+    public static Serde<Integer> intSerde = Serdes.Integer();
+
+    static Serde<Long> longSerde = Serdes.Long();
+
+    static Serde<Double> doubleSerde = Serdes.Double();
+
+    public static void sleep(final long duration) {
+        try {
+            Thread.sleep(duration);
+        } catch (final Exception ignore) { }
+    }
+
+}
diff --git 
a/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 00000000000..5803b2fbd02
--- /dev/null
+++ 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static 
org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {
+
+    /**
+     *  args ::= kafka propFileName command disableAutoTerminate
+     *  command := "run" | "process"
+     *
+     * @param args
+     */
+    public static void main(final String[] args) throws IOException {
+        if (args.length < 2) {
+            System.err.println("StreamsSmokeTest are expecting two parameters: 
propFile, command; but only see " + args.length + " parameter");
+            Exit.exit(1);
+        }
+
+        final String propFileName = args[0];
+        final String command = args[1];
+        final boolean disableAutoTerminate = args.length > 2;
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+        final String kafka = 
streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final String processingGuarantee = 
streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+
+        if (kafka == null) {
+            System.err.println("No bootstrap kafka servers specified in " + 
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+            Exit.exit(1);
+        }
+
+        if ("process".equals(command)) {
+            if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+                !StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
+
+                System.err.println("processingGuarantee must be either " + 
StreamsConfig.AT_LEAST_ONCE + " or " +
+                    StreamsConfig.EXACTLY_ONCE_V2);
+
+                Exit.exit(1);
+            }
+        }
+
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
+        System.out.println("command=" + command);
+        System.out.println("props=" + streamsProperties);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
+
+        switch (command) {
+            case "run":
+                // this starts the driver (data generation and result 
verification)
+                final int numKeys = 10;
+                final int maxRecordsPerKey = 500;
+                if (disableAutoTerminate) {
+                    generatePerpetually(kafka, numKeys, maxRecordsPerKey);
+                } else {
+                    // slow down data production to span 30 seconds so that 
system tests have time to
+                    // do their bounces, etc.
+                    final Map<String, Set<Integer>> allData =
+                        generate(kafka, numKeys, maxRecordsPerKey, 
Duration.ofSeconds(30));
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
+                break;
+            case "process":
+                // this starts the stream processing app
+                new 
SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+                break;
+            default:
+                System.out.println("unknown command: " + command);
+        }
+    }
+
+}
diff --git 
a/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..fd9330415ee
--- /dev/null
+++ 
b/streams/upgrade-system-tests-34/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.util.Properties;
+
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 1) {
+            System.err.println("StreamsUpgradeTest requires one argument 
(properties-file) but provided none");
+        }
+        final String propFileName = args[0];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v3.4)");
+        System.out.println("props=" + streamsProperties);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Integer> dataTable = builder.table(
+            "data", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> dataStream = dataTable.toStream();
+        dataStream.process(printProcessorSupplier("data"));
+        dataStream.to("echo");
+
+        final boolean runFkJoin = 
Boolean.parseBoolean(streamsProperties.getProperty(
+            "test.run_fk_join",
+            "false"));
+        if (runFkJoin) {
+            try {
+                final KTable<Integer, String> fkTable = builder.table(
+                    "fk", Consumed.with(intSerde, stringSerde));
+                buildFKTable(dataStream, fkTable);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
+
+        final Properties config = new Properties();
+        config.setProperty(
+            StreamsConfig.APPLICATION_ID_CONFIG,
+            "StreamsUpgradeTest");
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            streams.close();
+            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+            System.out.flush();
+        }));
+    }
+
+    private static void buildFKTable(final KStream<String, Integer> 
primaryTable,
+                                     final KTable<Integer, String> otherTable) 
{
+        final KStream<String, String> kStream = primaryTable.toTable()
+            .join(otherTable, v -> v, (k0, v0) -> v0)
+            .toStream();
+        kStream.process(printProcessorSupplier("fk"));
+        kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+    }
+
+    private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, 
VOut> printProcessorSupplier(final String topic) {
+        return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
+            private int numRecordsProcessed = 0;
+
+            @Override
+            public void init(final ProcessorContext<KOut, VOut> context) {
+                System.out.println("[3.4] initializing processor: topic=" + 
topic + "taskId=" + context.taskId());
+                numRecordsProcessed = 0;
+            }
+
+            @Override
+            public void process(final Record<KIn, VIn> record) {
+                numRecordsProcessed++;
+                if (numRecordsProcessed % 100 == 0) {
+                    System.out.println("processed " + numRecordsProcessed + " 
records from topic=" + topic);
+                }
+            }
+
+            @Override
+            public void close() {}
+        };
+    }
+}
diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
index 8144b31229e..65895586f86 100644
--- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -22,12 +22,12 @@ from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
-  LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, DEV_VERSION, KafkaVersion
+  LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, DEV_VERSION, 
KafkaVersion
 
 smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4),
                        str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7),
                        str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1),
-                       str(LATEST_3_2), str(LATEST_3_3)]
+                       str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4)]
 dev_version = [str(DEV_VERSION)]
 
 class StreamsUpgradeTest(Test):
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py 
b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index f9f4ea96f27..ab70b1d8269 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -23,7 +23,7 @@ from kafkatest.services.verifiable_consumer import 
VerifiableConsumer
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, 
LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \
     LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
-    LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, KafkaVersion
+    LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, KafkaVersion
 
 
 class StreamsBrokerCompatibility(Test):
@@ -64,6 +64,7 @@ class StreamsBrokerCompatibility(Test):
 
 
     @cluster(num_nodes=4)
+    @parametrize(broker_version=str(LATEST_3_4))
     @parametrize(broker_version=str(LATEST_3_3))
     @parametrize(broker_version=str(LATEST_3_2))
     @parametrize(broker_version=str(LATEST_3_1))
@@ -97,6 +98,7 @@ class StreamsBrokerCompatibility(Test):
         self.kafka.stop()
 
     @cluster(num_nodes=4)
+    @parametrize(broker_version=str(LATEST_3_4))
     @parametrize(broker_version=str(LATEST_3_3))
     @parametrize(broker_version=str(LATEST_3_2))
     @parametrize(broker_version=str(LATEST_3_1))
@@ -130,6 +132,7 @@ class StreamsBrokerCompatibility(Test):
         self.kafka.stop()
 
     @cluster(num_nodes=4)
+    @parametrize(broker_version=str(LATEST_3_4))
     @parametrize(broker_version=str(LATEST_3_3))
     @parametrize(broker_version=str(LATEST_3_2))
     @parametrize(broker_version=str(LATEST_3_1))
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 955ce029501..1033e3ad2c5 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -26,7 +26,7 @@ from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.streams.utils import extract_generation_from_logs, 
extract_generation_id
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
     LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
-    LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, DEV_BRANCH, DEV_VERSION, 
KafkaVersion
+    LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, DEV_BRANCH, 
DEV_VERSION, KafkaVersion
 
 # broker 0.10.0 is not compatible with newer Kafka Streams versions
 # broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() 
(since v2.2.1)
@@ -34,8 +34,7 @@ broker_upgrade_versions = [str(LATEST_0_11_0), 
str(LATEST_1_0), str(LATEST_1_1),
                            str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3),
                            str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7),
                            str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), 
str(LATEST_3_2),
-                           str(LATEST_3_3),
-                           str(DEV_BRANCH)]
+                           str(LATEST_3_3), str(LATEST_3_4), str(DEV_BRANCH)]
 
 metadata_1_versions = [str(LATEST_0_10_0)]
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
@@ -45,7 +44,7 @@ metadata_2_versions = [str(LATEST_0_10_1), 
str(LATEST_0_10_2), str(LATEST_0_11_0
 # -> https://issues.apache.org/jira/browse/KAFKA-14646
 # thus, we cannot test two bounce rolling upgrade because we know it's broken
 # instead we add version 2.4...3.3 to the `metadata_2_versions` upgrade list
-#fk_join_versions = [str(LATEST_3_4)]
+fk_join_versions = [str(LATEST_3_4)]
 
 
 """
@@ -204,7 +203,7 @@ class StreamsUpgradeTest(Test):
     @cluster(num_nodes=6)
     @matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)])
     @matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)])
-    #@matrix(from_version=fk_join_versions, to_version=[str(DEV_VERSION)])
+    @matrix(from_version=fk_join_versions, to_version=[str(DEV_VERSION)])
     def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
         """
         This test verifies that the cluster successfully upgrades despite 
changes in the metadata and FK

Reply via email to