This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dfd5454  MINOR: Add system test for optimization upgrades (#5912)
dfd5454 is described below

commit dfd545485ab0399e0c129ff2d68fbb3113f3e8c9
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Tue Nov 27 16:07:34 2018 -0500

    MINOR: Add system test for optimization upgrades (#5912)
    
    This is a new system test testing for optimizing an existing topology. This 
test takes the following steps
    
    1. Start a Kafka Streams application that uses a selectKey then performs 3 
groupByKey() operations and 1 join creating four repartition topics
    2. Verify all instances start and process data
    3. Stop all instances and verify stopped
    4. For each stopped instance update the config for TOPOLOGY_OPTIMIZATION to 
all then restart the instance and verify the instance has started successfully 
also verifying Kafka Streams reduced the number of repartition topics from 4 to 
1
    5. Verify that each instance is processing data from the aggregation, 
reduce, and join operation
    Stop all instances and verify the shut down is complete.
    6. For testing I ran two passes of the system test with 25 repeats for a 
total of 50 test runs.
    
    All test runs passed
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>, Bill Bejeck 
<b...@confluent.io>, Guozhang Wang <wangg...@gmail.com>
---
 .../kafka/streams/tests/StreamsOptimizedTest.java  | 156 +++++++++++++++++++++
 tests/kafkatest/services/streams.py                |  26 ++++
 .../tests/streams/streams_optimized_test.py        | 150 ++++++++++++++++++++
 3 files changed, 332 insertions(+)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
new file mode 100644
index 0000000..6bde92f
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Reducer;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.time.Duration.ofMillis;
+
+public class StreamsOptimizedTest {
+
+
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 1) {
+            System.err.println("StreamsOptimizedTest requires one argument 
(properties-file) but no provided: ");
+        }
+        final String propFileName = args[0];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        System.out.println("StreamsTest instance started 
StreamsOptimizedTest");
+        System.out.println("props=" + streamsProperties);
+
+        final String inputTopic = (String) 
Objects.requireNonNull(streamsProperties.remove("input.topic"));
+        final String aggregationTopic = (String) 
Objects.requireNonNull(streamsProperties.remove("aggregation.topic"));
+        final String reduceTopic = (String) 
Objects.requireNonNull(streamsProperties.remove("reduce.topic"));
+        final String joinTopic = (String) 
Objects.requireNonNull(streamsProperties.remove("join.topic"));
+
+
+        final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
+        final Initializer<Integer> initializer = () -> 0;
+        final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> 
agg + v.length();
+
+        final Reducer<String> reducer = (v1, v2) -> 
Integer.toString(Integer.parseInt(v1) + Integer.parseInt(v2));
+
+        final Function<String, String> keyFunction = s -> 
Integer.toString(Integer.parseInt(s) % 9);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> sourceStream = 
builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> mappedStream = 
sourceStream.selectKey((k, v) -> keyFunction.apply(v));
+
+        final KStream<String, Long> countStream = mappedStream.groupByKey()
+                                                               
.count(Materialized.with(Serdes.String(),
+                                                                               
         Serdes.Long())).toStream();
+
+        mappedStream.groupByKey().aggregate(
+            initializer,
+            aggregator,
+            Materialized.with(Serdes.String(), Serdes.Integer()))
+            .toStream()
+            .peek((k, v) -> System.out.println(String.format("AGGREGATED 
key=%s value=%s", k, v)))
+            .to(aggregationTopic, Produced.with(Serdes.String(), 
Serdes.Integer()));
+
+
+        mappedStream.groupByKey()
+            .reduce(reducer, Materialized.with(Serdes.String(), 
Serdes.String()))
+            .toStream()
+            .peek((k, v) -> System.out.println(String.format("REDUCED key=%s 
value=%s", k, v)))
+            .to(reduceTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        mappedStream.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
+            JoinWindows.of(ofMillis(500)),
+            Joined.with(Serdes.String(), Serdes.String(), Serdes.Long()))
+            .peek((k, v) -> System.out.println(String.format("JOINED key=%s 
value=%s", k, v)))
+            .to(joinTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+        final Properties config = new Properties();
+
+
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"StreamsOptimizedTest");
+        config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
"0");
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+
+
+        config.putAll(streamsProperties);
+
+        final Topology topology = builder.build(config);
+        final KafkaStreams streams = new KafkaStreams(topology, config);
+
+
+        streams.setStateListener((oldState, newState) -> {
+            if (oldState == State.REBALANCING && newState == State.RUNNING) {
+                final int repartitionTopicCount = 
getCountOfRepartitionTopicsFound(topology.describe().toString(), 
repartitionTopicPattern);
+                System.out.println(String.format("REBALANCING -> RUNNING with 
REPARTITION TOPIC COUNT=%d", repartitionTopicCount));
+                System.out.flush();
+            }
+        });
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close(Duration.ofMillis(5000));
+                System.out.println("OPTIMIZE_TEST Streams Stopped");
+                System.out.flush();
+            }
+        });
+
+    }
+
+    private static int getCountOfRepartitionTopicsFound(final String 
topologyString,
+                                                        final Pattern 
repartitionTopicPattern) {
+        final Matcher matcher = 
repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            final String repartitionTopic = matcher.group();
+            System.out.println(String.format("REPARTITION TOPIC found -> %s", 
repartitionTopic));
+            repartitionTopicsFound.add(repartitionTopic);
+        }
+        return repartitionTopicsFound.size();
+    }
+}
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index 67e6f02..fb43dfb 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -424,6 +424,32 @@ class StreamsStandbyTaskService(StreamsTestBaseService):
                                                         configs)
 
 
+class StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka):
+        super(StreamsOptimizedUpgradeTestService, self).__init__(test_context,
+                                                                 kafka,
+                                                                 
"org.apache.kafka.streams.tests.StreamsOptimizedTest",
+                                                                 "")
+        self.OPTIMIZED_CONFIG = 'none'
+        self.INPUT_TOPIC = None
+        self.AGGREGATION_TOPIC = None
+        self.REDUCE_TOPIC = None
+        self.JOIN_TOPIC = None
+
+    def prop_file(self):
+        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                      streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers()}
+
+        properties['topology.optimization'] = self.OPTIMIZED_CONFIG
+        properties['input.topic'] = self.INPUT_TOPIC
+        properties['aggregation.topic'] = self.AGGREGATION_TOPIC
+        properties['reduce.topic'] = self.REDUCE_TOPIC
+        properties['join.topic'] = self.JOIN_TOPIC
+
+        cfg = KafkaConfig(**properties)
+        return cfg.render()
+
+
 class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context,
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py 
b/tests/kafkatest/tests/streams/streams_optimized_test.py
new file mode 100644
index 0000000..e380aee
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class StreamsOptimizedTest(Test):
+    """
+    Test doing upgrades of a Kafka Streams application
+    that is un-optimized initially then optimized
+    """
+
+    input_topic = 'inputTopic'
+    aggregation_topic = 'aggregationTopic'
+    reduce_topic = 'reduceTopic'
+    join_topic = 'joinTopic'
+    operation_pattern = 'AGGREGATED\|REDUCED\|JOINED'
+
+    def __init__(self, test_context):
+        super(StreamsOptimizedTest, self).__init__(test_context)
+        self.topics = {
+            self.input_topic: {'partitions': 6},
+            self.aggregation_topic: {'partitions': 6},
+            self.reduce_topic: {'partitions': 6},
+            self.join_topic: {'partitions': 6}
+        }
+
+        self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
+        self.kafka = KafkaService(self.test_context, num_nodes=3,
+                                  zk=self.zookeeper, topics=self.topics)
+
+        self.producer = VerifiableProducer(self.test_context,
+                                           1,
+                                           self.kafka,
+                                           self.input_topic,
+                                           throughput=1000,
+                                           acks=1)
+
+    def test_upgrade_optimized_topology(self):
+        self.zookeeper.start()
+        self.kafka.start()
+
+        processor1 = StreamsOptimizedUpgradeTestService(self.test_context, 
self.kafka)
+        processor2 = StreamsOptimizedUpgradeTestService(self.test_context, 
self.kafka)
+        processor3 = StreamsOptimizedUpgradeTestService(self.test_context, 
self.kafka)
+
+        processors = [processor1, processor2, processor3]
+
+        # produce records continually during the test
+        self.producer.start()
+
+        # start all processors unoptimized
+        for processor in processors:
+            self.set_topics(processor)
+            processor.CLEAN_NODE_ENABLED = False
+            self.verify_running_repartition_topic_count(processor, 4)
+
+        self.verify_processing(processors, verify_individual_operations=False)
+
+        self.stop_processors(processors)
+
+        # start again with topology optimized
+        for processor in processors:
+            processor.OPTIMIZED_CONFIG = 'all'
+            self.verify_running_repartition_topic_count(processor, 1)
+
+        self.verify_processing(processors, verify_individual_operations=True)
+
+        self.stop_processors(processors)
+
+        self.producer.stop()
+        self.kafka.stop()
+        self.zookeeper.stop()
+
+    @staticmethod
+    def verify_running_repartition_topic_count(processor, 
repartition_topic_count):
+        node = processor.node
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            processor.start()
+            monitor.wait_until('REBALANCING -> RUNNING with REPARTITION TOPIC 
COUNT=%s' % repartition_topic_count,
+                               timeout_sec=60,
+                               err_msg="Never saw 'REBALANCING -> RUNNING with 
REPARTITION TOPIC COUNT=%s' message "
+                                       % repartition_topic_count + 
str(processor.node.account))
+
+    @staticmethod
+    def verify_stopped(processor):
+        node = processor.node
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            processor.stop()
+            monitor.wait_until('OPTIMIZE_TEST Streams Stopped',
+                               timeout_sec=60,
+                               err_msg="'OPTIMIZE_TEST Streams Stopped' 
message" + str(processor.node.account))
+
+    def verify_processing(self, processors, verify_individual_operations):
+        for processor in processors:
+            if not self.all_source_subtopology_tasks(processor):
+                if verify_individual_operations:
+                    for operation in self.operation_pattern.split('\|'):
+                        self.do_verify(processor, operation)
+                else:
+                    self.do_verify(processor, self.operation_pattern)
+            else:
+                self.logger.info("Skipping processor %s with all source tasks" 
% processor.node.account)
+
+    def do_verify(self, processor, pattern):
+        self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % 
pattern)
+        with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
+            monitor.wait_until(pattern,
+                               timeout_sec=60,
+                               err_msg="Never saw processing of %s " % pattern 
+ str(processor.node.account))
+
+    def all_source_subtopology_tasks(self, processor):
+        retries = 0
+        while retries < 5:
+            found = list(processor.node.account.ssh_capture("sed -n 
's/.*current active tasks: \[\(\(0_[0-9], \)\{3\}0_[0-9]\)\].*/\1/p' %s" % 
processor.LOG_FILE, allow_fail=True))
+            self.logger.info("Returned %s from assigned task check" % found)
+            if len(found) > 0:
+                return True
+            retries += 1
+            time.sleep(1)
+
+        return False
+
+    def stop_processors(self, processors):
+        for processor in processors:
+            self.verify_stopped(processor)
+
+    def set_topics(self, processor):
+        processor.INPUT_TOPIC = self.input_topic
+        processor.AGGREGATION_TOPIC = self.aggregation_topic
+        processor.REDUCE_TOPIC = self.reduce_topic
+        processor.JOIN_TOPIC = self.join_topic

Reply via email to