This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4603f7495e9 KAFKA-18030 Remove old upgrade-system-tests modules 
(#17843)
4603f7495e9 is described below

commit 4603f7495e981cf22c7aa5168c05b0d7a43473a3
Author: mingdaoy <[email protected]>
AuthorDate: Tue Dec 10 11:19:14 2024 +0800

    KAFKA-18030 Remove old upgrade-system-tests modules (#17843)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 bin/kafka-run-class.sh                             |   8 --
 build.gradle                                       |  54 -----------
 gradle/dependencies.gradle                         |   6 --
 settings.gradle                                    |   3 -
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 103 --------------------
 .../StreamsUpgradeToCooperativeRebalanceTest.java  |  89 -----------------
 .../kafka/streams/tests/StreamsUpgradeTest.java    | 106 ---------------------
 .../StreamsUpgradeToCooperativeRebalanceTest.java  |  88 -----------------
 .../kafka/streams/tests/StreamsUpgradeTest.java    |  97 -------------------
 .../StreamsUpgradeToCooperativeRebalanceTest.java  |  84 ----------------
 10 files changed, 638 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index b3291e461f2..64cf6d95d51 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -116,14 +116,6 @@ else
       CLASSPATH="$file":"$CLASSPATH"
     fi
   done
-  if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
-    
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
-    
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
-  fi
-  if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
-    
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
-    
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
-  fi
 fi
 
 for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
diff --git a/build.gradle b/build.gradle
index 5aeec1b5033..ff68ef022c6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2727,9 +2727,6 @@ project(':streams') {
             ':streams:integration-tests',
             ':streams:test-utils:test',
             ':streams:streams-scala:test',
-            ':streams:upgrade-system-tests-0100:test',
-            ':streams:upgrade-system-tests-0101:test',
-            ':streams:upgrade-system-tests-0102:test',
             ':streams:upgrade-system-tests-0110:test',
             ':streams:upgrade-system-tests-10:test',
             ':streams:upgrade-system-tests-11:test',
@@ -2933,57 +2930,6 @@ project(':streams:examples') {
   }
 }
 
-project(':streams:upgrade-system-tests-0100') {
-  base {
-    archivesName = "kafka-streams-upgrade-system-tests-0100"
-  }
-
-  dependencies {
-    testImplementation(libs.kafkaStreams_0100) {
-      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
-      exclude group: 'log4j', module: 'log4j'
-    }
-    testRuntimeOnly libs.junitJupiter
-  }
-
-  systemTestLibs {
-    dependsOn testJar
-  }
-}
-
-project(':streams:upgrade-system-tests-0101') {
-  base {
-    archivesName = "kafka-streams-upgrade-system-tests-0101"
-  }
-
-  dependencies {
-    testImplementation(libs.kafkaStreams_0101) {
-      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
-      exclude group: 'log4j', module: 'log4j'
-    }
-    testRuntimeOnly libs.junitJupiter
-  }
-
-  systemTestLibs {
-    dependsOn testJar
-  }
-}
-
-project(':streams:upgrade-system-tests-0102') {
-  base {
-    archivesName = "kafka-streams-upgrade-system-tests-0102"
-  }
-
-  dependencies {
-    testImplementation libs.kafkaStreams_0102
-    testRuntimeOnly libs.junitJupiter
-  }
-
-  systemTestLibs {
-    dependsOn testJar
-  }
-}
-
 project(':streams:upgrade-system-tests-0110') {
   base{
     archivesName = "kafka-streams-upgrade-system-tests-0110"
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index bc4d6fcea34..06176436a8c 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -84,9 +84,6 @@ versions += [
   jose4j: "0.9.4",
   junit: "5.10.2",
   jqwik: "1.8.3",
-  kafka_0100: "0.10.0.1",
-  kafka_0101: "0.10.1.1",
-  kafka_0102: "0.10.2.2",
   kafka_0110: "0.11.0.3",
   kafka_10: "1.0.2",
   kafka_11: "1.1.1",
@@ -185,9 +182,6 @@ libs += [
   junitPlatformLanucher: 
"org.junit.platform:junit-platform-launcher:$versions.junitPlatform",
   jqwik: "net.jqwik:jqwik:$versions.jqwik",
   hamcrest: "org.hamcrest:hamcrest:$versions.hamcrest",
-  kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
-  kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
-  kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
   kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
   kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
   kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
diff --git a/settings.gradle b/settings.gradle
index a2a7dacf1a2..dd76b769025 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -84,9 +84,6 @@ include 'clients',
     'streams:integration-tests',
     'streams:streams-scala',
     'streams:test-utils',
-    'streams:upgrade-system-tests-0100',
-    'streams:upgrade-system-tests-0101',
-    'streams:upgrade-system-tests-0102',
     'streams:upgrade-system-tests-0110',
     'streams:upgrade-system-tests-10',
     'streams:upgrade-system-tests-11',
diff --git 
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
 
b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
deleted file mode 100644
index 27712cc5ace..00000000000
--- 
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Properties;
-
-public class StreamsUpgradeTest {
-
-    @SuppressWarnings("unchecked")
-    public static void main(final String[] args) throws Exception {
-        if (args.length < 2) {
-            System.err.println("StreamsUpgradeTest requires two arguments 
(zookeeper-url, properties-file) but only " + args.length + " provided: "
-                + (args.length > 0 ? args[0] + " " : ""));
-        }
-        final String zookeeper = args[0];
-        final String propFileName = args[1];
-
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-
-        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v0.10.0)");
-        System.out.println("zookeeper=" + zookeeper);
-        System.out.println("props=" + streamsProperties);
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
-        dataStream.to("echo");
-
-        final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
-        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-        config.putAll(streamsProperties);
-
-        final KafkaStreams streams = new KafkaStreams(builder, config);
-        streams.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                System.out.println("closing Kafka Streams instance");
-                System.out.flush();
-                streams.close();
-                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
-                System.out.flush();
-            }
-        });
-    }
-
-    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
-        return new ProcessorSupplier<K, V>() {
-            public Processor<K, V> get() {
-                return new AbstractProcessor<K, V>() {
-                    private int numRecordsProcessed = 0;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        System.out.println("[0.10.0] initializing processor: 
topic=data taskId=" + context.taskId());
-                        numRecordsProcessed = 0;
-                    }
-
-                    @Override
-                    public void process(final K key, final V value) {
-                        numRecordsProcessed++;
-                        if (numRecordsProcessed % 100 == 0) {
-                            System.out.println("processed " + 
numRecordsProcessed + " records from topic=data");
-                        }
-                    }
-
-                    @Override
-                    public void punctuate(final long timestamp) {}
-
-                    @Override
-                    public void close() {}
-                };
-            }
-        };
-    }
-}
diff --git 
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
 
b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
deleted file mode 100644
index 1528b2c472b..00000000000
--- 
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-
-import java.util.Properties;
-
-public class StreamsUpgradeToCooperativeRebalanceTest {
-
-
-    @SuppressWarnings("unchecked")
-    public static void main(final String[] args) throws Exception {
-        if (args.length < 2) {
-            System.err.println("StreamsUpgradeToCooperativeRebalanceTest 
requires two arguments (zookeeper-url, properties-file) but only " + 
args.length + " provided: "
-                + (args.length > 0 ? args[0] : ""));
-        }
-
-        final String zookeeper = args[0];
-        final String propFileName = args[1];
-
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-        final Properties config = new Properties();
-
-        System.out.println("StreamsTest instance started 
(StreamsUpgradeToCooperativeRebalanceTest v0.10.0)");
-        System.out.println("zookeeper=" + zookeeper);
-        System.out.println("props=" + config);
-
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"cooperative-rebalance-upgrade");
-        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-        config.putAll(streamsProperties);
-
-        final String sourceTopic = config.getProperty("source.topic", 
"source");
-        final String sinkTopic = config.getProperty("sink.topic", "sink");
-        final int reportInterval = 
Integer.parseInt(config.getProperty("report.interval", "100"));
-        final String upgradePhase = config.getProperty("upgrade.phase",  "");
-
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        final KStream<String, String> upgradeStream = 
builder.stream(sourceTopic);
-        upgradeStream.foreach(new ForeachAction<String, String>() {
-            int recordCounter = 0;
-
-            @Override
-            public void apply(final String key, final String value) {
-                if (recordCounter++ % reportInterval == 0) {
-                    System.out.printf("%sProcessed %d records so far%n", 
upgradePhase, recordCounter);
-                    System.out.flush();
-                }
-            }
-        }
-        );
-        upgradeStream.to(sinkTopic);
-
-        final KafkaStreams streams = new KafkaStreams(builder, config);
-
-
-        streams.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            streams.close();
-            System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", 
upgradePhase);
-            System.out.flush();
-        }));
-    }
-}
diff --git 
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
 
b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
deleted file mode 100644
index 379720b9562..00000000000
--- 
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Properties;
-
-public class StreamsUpgradeTest {
-
-    /**
-     * This test cannot be executed, as long as Kafka 0.10.1.2 is not released
-     */
-    @SuppressWarnings("unchecked")
-    public static void main(final String[] args) throws Exception {
-        if (args.length < 2) {
-            System.err.println("StreamsUpgradeTest requires two arguments 
(zookeeper-url, properties-file) but only " + args.length + " provided: "
-                + (args.length > 0 ? args[0] + " " : ""));
-        }
-        final String zookeeper = args[0];
-        final String propFileName = args[1];
-
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-
-        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v0.10.1)");
-        System.out.println("zookeeper=" + zookeeper);
-        System.out.println("props=" + streamsProperties);
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
-        dataStream.to("echo");
-
-        final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
-        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-        config.putAll(streamsProperties);
-
-        final KafkaStreams streams = new KafkaStreams(builder, config);
-        streams.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                System.out.println("closing Kafka Streams instance");
-                System.out.flush();
-                streams.close();
-                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
-                System.out.flush();
-            }
-        });
-    }
-
-    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
-        return new ProcessorSupplier<K, V>() {
-            public Processor<K, V> get() {
-                return new AbstractProcessor<K, V>() {
-                    private int numRecordsProcessed = 0;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        System.out.println("[0.10.1] initializing processor: 
topic=data taskId=" + context.taskId());
-                        numRecordsProcessed = 0;
-                    }
-
-                    @Override
-                    public void process(final K key, final V value) {
-                        numRecordsProcessed++;
-                        if (numRecordsProcessed % 100 == 0) {
-                            System.out.println("processed " + 
numRecordsProcessed + " records from topic=data");
-                        }
-                    }
-
-                    @Override
-                    public void punctuate(final long timestamp) {}
-
-                    @Override
-                    public void close() {}
-                };
-            }
-        };
-    }
-}
diff --git 
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
 
b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
deleted file mode 100644
index 4efe70911ab..00000000000
--- 
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-
-import java.util.Properties;
-
-public class StreamsUpgradeToCooperativeRebalanceTest {
-
-
-    @SuppressWarnings("unchecked")
-    public static void main(final String[] args) throws Exception {
-        if (args.length < 2) {
-            System.err.println("StreamsUpgradeToCooperativeRebalanceTest 
requires two arguments (zookeeper-url, properties-file) but only " + 
args.length + " provided: "
-                + (args.length > 0 ? args[0] : ""));
-        }
-        final String zookeeper = args[0];
-        final String propFileName = args[1];
-
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-        final Properties config = new Properties();
-
-        System.out.println("StreamsTest instance started 
(StreamsUpgradeToCooperativeRebalanceTest v0.10.1)");
-        System.out.println("zookeeper=" + zookeeper);
-        System.out.println("props=" + config);
-
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"cooperative-rebalance-upgrade");
-        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-        config.putAll(streamsProperties);
-
-        final String sourceTopic = config.getProperty("source.topic", 
"source");
-        final String sinkTopic = config.getProperty("sink.topic", "sink");
-        final int reportInterval = 
Integer.parseInt(config.getProperty("report.interval", "100"));
-        final String upgradePhase = config.getProperty("upgrade.phase",  "");
-
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        final KStream<String, String> upgradeStream = 
builder.stream(sourceTopic);
-        upgradeStream.foreach(new ForeachAction<String, String>() {
-            int recordCounter = 0;
-
-            @Override
-            public void apply(final String key, final String value) {
-                if (recordCounter++ % reportInterval == 0) {
-                    System.out.printf("%sProcessed %d records so far%n", 
upgradePhase, recordCounter);
-                    System.out.flush();
-                }
-            }
-        }
-        );
-        upgradeStream.to(sinkTopic);
-
-        final KafkaStreams streams = new KafkaStreams(builder, config);
-
-
-        streams.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            streams.close();
-            System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", 
upgradePhase);
-            System.out.flush();
-        }));
-    }
-}
diff --git 
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
 
b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
deleted file mode 100644
index 75e548439ce..00000000000
--- 
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Properties;
-
-public class StreamsUpgradeTest {
-
-    @SuppressWarnings("unchecked")
-    public static void main(final String[] args) throws Exception {
-        if (args.length < 1) {
-            System.err.println("StreamsUpgradeTest requires one argument 
(properties-file) but provided none");
-        }
-        final String propFileName = args[0];
-
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-
-        System.out.println("StreamsTest instance started (StreamsUpgradeTest 
v0.10.2)");
-        System.out.println("props=" + streamsProperties);
-
-        final KStreamBuilder builder = new KStreamBuilder();
-        final KStream dataStream = builder.stream("data");
-        dataStream.process(printProcessorSupplier());
-        dataStream.to("echo");
-
-        final Properties config = new Properties();
-        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsUpgradeTest");
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-        config.putAll(streamsProperties);
-
-        final KafkaStreams streams = new KafkaStreams(builder, config);
-        streams.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                streams.close();
-                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
-                System.out.flush();
-            }
-        });
-    }
-
-    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
-        return new ProcessorSupplier<K, V>() {
-            public Processor<K, V> get() {
-                return new AbstractProcessor<K, V>() {
-                    private int numRecordsProcessed = 0;
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        System.out.println("[0.10.2] initializing processor: 
topic=data taskId=" + context.taskId());
-                        numRecordsProcessed = 0;
-                    }
-
-                    @Override
-                    public void process(final K key, final V value) {
-                        numRecordsProcessed++;
-                        if (numRecordsProcessed % 100 == 0) {
-                            System.out.println("processed " + 
numRecordsProcessed + " records from topic=data");
-                        }
-                    }
-
-                    @Override
-                    public void punctuate(final long timestamp) {}
-
-                    @Override
-                    public void close() {}
-                };
-            }
-        };
-    }
-}
diff --git 
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
 
b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
deleted file mode 100644
index 1cc115f3c06..00000000000
--- 
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-
-import java.util.Properties;
-
-public class StreamsUpgradeToCooperativeRebalanceTest {
-
-
-    @SuppressWarnings("unchecked")
-    public static void main(final String[] args) throws Exception {
-        if (args.length < 1) {
-            System.err.println("StreamsUpgradeToCooperativeRebalanceTest 
requires one argument (properties-file) but none provided");
-        }
-        final String propFileName = args[0];
-
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-        final Properties config = new Properties();
-
-        System.out.println("StreamsTest instance started 
(StreamsUpgradeToCooperativeRebalanceTest v0.10.2)");
-        System.out.println("props=" + config);
-
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"cooperative-rebalance-upgrade");
-        config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-        config.putAll(streamsProperties);
-
-        final String sourceTopic = config.getProperty("source.topic", 
"source");
-        final String sinkTopic = config.getProperty("sink.topic", "sink");
-        final int reportInterval = 
Integer.parseInt(config.getProperty("report.interval", "100"));
-        final String upgradePhase = config.getProperty("upgrade.phase",  "");
-
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        final KStream<String, String> upgradeStream = 
builder.stream(sourceTopic);
-        upgradeStream.foreach(new ForeachAction<String, String>() {
-            int recordCounter = 0;
-
-            @Override
-            public void apply(final String key, final String value) {
-                if (recordCounter++ % reportInterval == 0) {
-                    System.out.printf("%sProcessed %d records so far%n", 
upgradePhase, recordCounter);
-                    System.out.flush();
-                }
-            }
-        }
-        );
-        upgradeStream.to(sinkTopic);
-
-        final KafkaStreams streams = new KafkaStreams(builder, config);
-
-
-        streams.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-            streams.close();
-            System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", 
upgradePhase);
-            System.out.flush();
-        }));
-    }
-}

Reply via email to