This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4603f7495e9 KAFKA-18030 Remove old upgrade-system-tests modules
(#17843)
4603f7495e9 is described below
commit 4603f7495e981cf22c7aa5168c05b0d7a43473a3
Author: mingdaoy <[email protected]>
AuthorDate: Tue Dec 10 11:19:14 2024 +0800
KAFKA-18030 Remove old upgrade-system-tests modules (#17843)
Reviewers: Chia-Ping Tsai <[email protected]>
---
bin/kafka-run-class.sh | 8 --
build.gradle | 54 -----------
gradle/dependencies.gradle | 6 --
settings.gradle | 3 -
.../kafka/streams/tests/StreamsUpgradeTest.java | 103 --------------------
.../StreamsUpgradeToCooperativeRebalanceTest.java | 89 -----------------
.../kafka/streams/tests/StreamsUpgradeTest.java | 106 ---------------------
.../StreamsUpgradeToCooperativeRebalanceTest.java | 88 -----------------
.../kafka/streams/tests/StreamsUpgradeTest.java | 97 -------------------
.../StreamsUpgradeToCooperativeRebalanceTest.java | 84 ----------------
10 files changed, 638 deletions(-)
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index b3291e461f2..64cf6d95d51 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -116,14 +116,6 @@ else
CLASSPATH="$file":"$CLASSPATH"
fi
done
- if [ "$SHORT_VERSION_NO_DOTS" = "0100" ]; then
-
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.8.jar":"$CLASSPATH"
-
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.6.jar":"$CLASSPATH"
- fi
- if [ "$SHORT_VERSION_NO_DOTS" = "0101" ]; then
-
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zkclient-0.9.jar":"$CLASSPATH"
-
CLASSPATH="/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs/zookeeper-3.4.8.jar":"$CLASSPATH"
- fi
fi
for file in "$streams_dependant_clients_lib_dir"/rocksdb*.jar;
diff --git a/build.gradle b/build.gradle
index 5aeec1b5033..ff68ef022c6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2727,9 +2727,6 @@ project(':streams') {
':streams:integration-tests',
':streams:test-utils:test',
':streams:streams-scala:test',
- ':streams:upgrade-system-tests-0100:test',
- ':streams:upgrade-system-tests-0101:test',
- ':streams:upgrade-system-tests-0102:test',
':streams:upgrade-system-tests-0110:test',
':streams:upgrade-system-tests-10:test',
':streams:upgrade-system-tests-11:test',
@@ -2933,57 +2930,6 @@ project(':streams:examples') {
}
}
-project(':streams:upgrade-system-tests-0100') {
- base {
- archivesName = "kafka-streams-upgrade-system-tests-0100"
- }
-
- dependencies {
- testImplementation(libs.kafkaStreams_0100) {
- exclude group: 'org.slf4j', module: 'slf4j-log4j12'
- exclude group: 'log4j', module: 'log4j'
- }
- testRuntimeOnly libs.junitJupiter
- }
-
- systemTestLibs {
- dependsOn testJar
- }
-}
-
-project(':streams:upgrade-system-tests-0101') {
- base {
- archivesName = "kafka-streams-upgrade-system-tests-0101"
- }
-
- dependencies {
- testImplementation(libs.kafkaStreams_0101) {
- exclude group: 'org.slf4j', module: 'slf4j-log4j12'
- exclude group: 'log4j', module: 'log4j'
- }
- testRuntimeOnly libs.junitJupiter
- }
-
- systemTestLibs {
- dependsOn testJar
- }
-}
-
-project(':streams:upgrade-system-tests-0102') {
- base {
- archivesName = "kafka-streams-upgrade-system-tests-0102"
- }
-
- dependencies {
- testImplementation libs.kafkaStreams_0102
- testRuntimeOnly libs.junitJupiter
- }
-
- systemTestLibs {
- dependsOn testJar
- }
-}
-
project(':streams:upgrade-system-tests-0110') {
base{
archivesName = "kafka-streams-upgrade-system-tests-0110"
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index bc4d6fcea34..06176436a8c 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -84,9 +84,6 @@ versions += [
jose4j: "0.9.4",
junit: "5.10.2",
jqwik: "1.8.3",
- kafka_0100: "0.10.0.1",
- kafka_0101: "0.10.1.1",
- kafka_0102: "0.10.2.2",
kafka_0110: "0.11.0.3",
kafka_10: "1.0.2",
kafka_11: "1.1.1",
@@ -185,9 +182,6 @@ libs += [
junitPlatformLanucher:
"org.junit.platform:junit-platform-launcher:$versions.junitPlatform",
jqwik: "net.jqwik:jqwik:$versions.jqwik",
hamcrest: "org.hamcrest:hamcrest:$versions.hamcrest",
- kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
- kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
- kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110",
kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10",
kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11",
diff --git a/settings.gradle b/settings.gradle
index a2a7dacf1a2..dd76b769025 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -84,9 +84,6 @@ include 'clients',
'streams:integration-tests',
'streams:streams-scala',
'streams:test-utils',
- 'streams:upgrade-system-tests-0100',
- 'streams:upgrade-system-tests-0101',
- 'streams:upgrade-system-tests-0102',
'streams:upgrade-system-tests-0110',
'streams:upgrade-system-tests-10',
'streams:upgrade-system-tests-11',
diff --git
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
deleted file mode 100644
index 27712cc5ace..00000000000
---
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Properties;
-
-public class StreamsUpgradeTest {
-
- @SuppressWarnings("unchecked")
- public static void main(final String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("StreamsUpgradeTest requires two arguments
(zookeeper-url, properties-file) but only " + args.length + " provided: "
- + (args.length > 0 ? args[0] + " " : ""));
- }
- final String zookeeper = args[0];
- final String propFileName = args[1];
-
- final Properties streamsProperties = Utils.loadProps(propFileName);
-
- System.out.println("StreamsTest instance started (StreamsUpgradeTest
v0.10.0)");
- System.out.println("zookeeper=" + zookeeper);
- System.out.println("props=" + streamsProperties);
-
- final KStreamBuilder builder = new KStreamBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
- dataStream.to("echo");
-
- final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest");
- config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
- config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
- config.putAll(streamsProperties);
-
- final KafkaStreams streams = new KafkaStreams(builder, config);
- streams.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- System.out.println("closing Kafka Streams instance");
- System.out.flush();
- streams.close();
- System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
- System.out.flush();
- }
- });
- }
-
- private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
- return new ProcessorSupplier<K, V>() {
- public Processor<K, V> get() {
- return new AbstractProcessor<K, V>() {
- private int numRecordsProcessed = 0;
-
- @Override
- public void init(final ProcessorContext context) {
- System.out.println("[0.10.0] initializing processor:
topic=data taskId=" + context.taskId());
- numRecordsProcessed = 0;
- }
-
- @Override
- public void process(final K key, final V value) {
- numRecordsProcessed++;
- if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " +
numRecordsProcessed + " records from topic=data");
- }
- }
-
- @Override
- public void punctuate(final long timestamp) {}
-
- @Override
- public void close() {}
- };
- }
- };
- }
-}
diff --git
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
deleted file mode 100644
index 1528b2c472b..00000000000
---
a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-
-import java.util.Properties;
-
-public class StreamsUpgradeToCooperativeRebalanceTest {
-
-
- @SuppressWarnings("unchecked")
- public static void main(final String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("StreamsUpgradeToCooperativeRebalanceTest
requires two arguments (zookeeper-url, properties-file) but only " +
args.length + " provided: "
- + (args.length > 0 ? args[0] : ""));
- }
-
- final String zookeeper = args[0];
- final String propFileName = args[1];
-
- final Properties streamsProperties = Utils.loadProps(propFileName);
- final Properties config = new Properties();
-
- System.out.println("StreamsTest instance started
(StreamsUpgradeToCooperativeRebalanceTest v0.10.0)");
- System.out.println("zookeeper=" + zookeeper);
- System.out.println("props=" + config);
-
- config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"cooperative-rebalance-upgrade");
- config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
- config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
- config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
- config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
- config.putAll(streamsProperties);
-
- final String sourceTopic = config.getProperty("source.topic",
"source");
- final String sinkTopic = config.getProperty("sink.topic", "sink");
- final int reportInterval =
Integer.parseInt(config.getProperty("report.interval", "100"));
- final String upgradePhase = config.getProperty("upgrade.phase", "");
-
- final KStreamBuilder builder = new KStreamBuilder();
-
- final KStream<String, String> upgradeStream =
builder.stream(sourceTopic);
- upgradeStream.foreach(new ForeachAction<String, String>() {
- int recordCounter = 0;
-
- @Override
- public void apply(final String key, final String value) {
- if (recordCounter++ % reportInterval == 0) {
- System.out.printf("%sProcessed %d records so far%n",
upgradePhase, recordCounter);
- System.out.flush();
- }
- }
- }
- );
- upgradeStream.to(sinkTopic);
-
- final KafkaStreams streams = new KafkaStreams(builder, config);
-
-
- streams.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- streams.close();
- System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n",
upgradePhase);
- System.out.flush();
- }));
- }
-}
diff --git
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
deleted file mode 100644
index 379720b9562..00000000000
---
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Properties;
-
-public class StreamsUpgradeTest {
-
- /**
- * This test cannot be executed, as long as Kafka 0.10.1.2 is not released
- */
- @SuppressWarnings("unchecked")
- public static void main(final String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("StreamsUpgradeTest requires two arguments
(zookeeper-url, properties-file) but only " + args.length + " provided: "
- + (args.length > 0 ? args[0] + " " : ""));
- }
- final String zookeeper = args[0];
- final String propFileName = args[1];
-
- final Properties streamsProperties = Utils.loadProps(propFileName);
-
- System.out.println("StreamsTest instance started (StreamsUpgradeTest
v0.10.1)");
- System.out.println("zookeeper=" + zookeeper);
- System.out.println("props=" + streamsProperties);
-
- final KStreamBuilder builder = new KStreamBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
- dataStream.to("echo");
-
- final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest");
- config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
- config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
- config.putAll(streamsProperties);
-
- final KafkaStreams streams = new KafkaStreams(builder, config);
- streams.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- System.out.println("closing Kafka Streams instance");
- System.out.flush();
- streams.close();
- System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
- System.out.flush();
- }
- });
- }
-
- private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
- return new ProcessorSupplier<K, V>() {
- public Processor<K, V> get() {
- return new AbstractProcessor<K, V>() {
- private int numRecordsProcessed = 0;
-
- @Override
- public void init(final ProcessorContext context) {
- System.out.println("[0.10.1] initializing processor:
topic=data taskId=" + context.taskId());
- numRecordsProcessed = 0;
- }
-
- @Override
- public void process(final K key, final V value) {
- numRecordsProcessed++;
- if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " +
numRecordsProcessed + " records from topic=data");
- }
- }
-
- @Override
- public void punctuate(final long timestamp) {}
-
- @Override
- public void close() {}
- };
- }
- };
- }
-}
diff --git
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
b/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
deleted file mode 100644
index 4efe70911ab..00000000000
---
a/streams/upgrade-system-tests-0101/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-
-import java.util.Properties;
-
-public class StreamsUpgradeToCooperativeRebalanceTest {
-
-
- @SuppressWarnings("unchecked")
- public static void main(final String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("StreamsUpgradeToCooperativeRebalanceTest
requires two arguments (zookeeper-url, properties-file) but only " +
args.length + " provided: "
- + (args.length > 0 ? args[0] : ""));
- }
- final String zookeeper = args[0];
- final String propFileName = args[1];
-
- final Properties streamsProperties = Utils.loadProps(propFileName);
- final Properties config = new Properties();
-
- System.out.println("StreamsTest instance started
(StreamsUpgradeToCooperativeRebalanceTest v0.10.1)");
- System.out.println("zookeeper=" + zookeeper);
- System.out.println("props=" + config);
-
- config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"cooperative-rebalance-upgrade");
- config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
- config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
- config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
- config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
- config.putAll(streamsProperties);
-
- final String sourceTopic = config.getProperty("source.topic",
"source");
- final String sinkTopic = config.getProperty("sink.topic", "sink");
- final int reportInterval =
Integer.parseInt(config.getProperty("report.interval", "100"));
- final String upgradePhase = config.getProperty("upgrade.phase", "");
-
- final KStreamBuilder builder = new KStreamBuilder();
-
- final KStream<String, String> upgradeStream =
builder.stream(sourceTopic);
- upgradeStream.foreach(new ForeachAction<String, String>() {
- int recordCounter = 0;
-
- @Override
- public void apply(final String key, final String value) {
- if (recordCounter++ % reportInterval == 0) {
- System.out.printf("%sProcessed %d records so far%n",
upgradePhase, recordCounter);
- System.out.flush();
- }
- }
- }
- );
- upgradeStream.to(sinkTopic);
-
- final KafkaStreams streams = new KafkaStreams(builder, config);
-
-
- streams.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- streams.close();
- System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n",
upgradePhase);
- System.out.flush();
- }));
- }
-}
diff --git
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
deleted file mode 100644
index 75e548439ce..00000000000
---
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.util.Properties;
-
-public class StreamsUpgradeTest {
-
- @SuppressWarnings("unchecked")
- public static void main(final String[] args) throws Exception {
- if (args.length < 1) {
- System.err.println("StreamsUpgradeTest requires one argument
(properties-file) but provided none");
- }
- final String propFileName = args[0];
-
- final Properties streamsProperties = Utils.loadProps(propFileName);
-
- System.out.println("StreamsTest instance started (StreamsUpgradeTest
v0.10.2)");
- System.out.println("props=" + streamsProperties);
-
- final KStreamBuilder builder = new KStreamBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
- dataStream.to("echo");
-
- final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest");
- config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
- config.putAll(streamsProperties);
-
- final KafkaStreams streams = new KafkaStreams(builder, config);
- streams.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- streams.close();
- System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
- System.out.flush();
- }
- });
- }
-
- private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
- return new ProcessorSupplier<K, V>() {
- public Processor<K, V> get() {
- return new AbstractProcessor<K, V>() {
- private int numRecordsProcessed = 0;
-
- @Override
- public void init(final ProcessorContext context) {
- System.out.println("[0.10.2] initializing processor:
topic=data taskId=" + context.taskId());
- numRecordsProcessed = 0;
- }
-
- @Override
- public void process(final K key, final V value) {
- numRecordsProcessed++;
- if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " +
numRecordsProcessed + " records from topic=data");
- }
- }
-
- @Override
- public void punctuate(final long timestamp) {}
-
- @Override
- public void close() {}
- };
- }
- };
- }
-}
diff --git
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
deleted file mode 100644
index 1cc115f3c06..00000000000
---
a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-
-import java.util.Properties;
-
-public class StreamsUpgradeToCooperativeRebalanceTest {
-
-
- @SuppressWarnings("unchecked")
- public static void main(final String[] args) throws Exception {
- if (args.length < 1) {
- System.err.println("StreamsUpgradeToCooperativeRebalanceTest
requires one argument (properties-file) but none provided");
- }
- final String propFileName = args[0];
-
- final Properties streamsProperties = Utils.loadProps(propFileName);
- final Properties config = new Properties();
-
- System.out.println("StreamsTest instance started
(StreamsUpgradeToCooperativeRebalanceTest v0.10.2)");
- System.out.println("props=" + config);
-
- config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"cooperative-rebalance-upgrade");
- config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
- config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
- config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
- config.putAll(streamsProperties);
-
- final String sourceTopic = config.getProperty("source.topic",
"source");
- final String sinkTopic = config.getProperty("sink.topic", "sink");
- final int reportInterval =
Integer.parseInt(config.getProperty("report.interval", "100"));
- final String upgradePhase = config.getProperty("upgrade.phase", "");
-
- final KStreamBuilder builder = new KStreamBuilder();
-
- final KStream<String, String> upgradeStream =
builder.stream(sourceTopic);
- upgradeStream.foreach(new ForeachAction<String, String>() {
- int recordCounter = 0;
-
- @Override
- public void apply(final String key, final String value) {
- if (recordCounter++ % reportInterval == 0) {
- System.out.printf("%sProcessed %d records so far%n",
upgradePhase, recordCounter);
- System.out.flush();
- }
- }
- }
- );
- upgradeStream.to(sinkTopic);
-
- final KafkaStreams streams = new KafkaStreams(builder, config);
-
-
- streams.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- streams.close();
- System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n",
upgradePhase);
- System.out.flush();
- }));
- }
-}