This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new dfd5454 MINOR: Add system test for optimization upgrades (#5912) dfd5454 is described below commit dfd545485ab0399e0c129ff2d68fbb3113f3e8c9 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Tue Nov 27 16:07:34 2018 -0500 MINOR: Add system test for optimization upgrades (#5912) This is a new system test testing for optimizing an existing topology. This test takes the following steps 1. Start a Kafka Streams application that uses a selectKey then performs 3 groupByKey() operations and 1 join creating four repartition topics 2. Verify all instances start and process data 3. Stop all instances and verify stopped 4. For each stopped instance update the config for TOPOLOGY_OPTIMIZATION to all then restart the instance and verify the instance has started successfully also verifying Kafka Streams reduced the number of repartition topics from 4 to 1 5. Verify that each instance is processing data from the aggregation, reduce, and join operation Stop all instances and verify the shut down is complete. 6. For testing I ran two passes of the system test with 25 repeats for a total of 50 test runs. All test runs passed Reviewers: Matthias J. Sax <matth...@confluent.io>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../kafka/streams/tests/StreamsOptimizedTest.java | 156 +++++++++++++++++++++ tests/kafkatest/services/streams.py | 26 ++++ .../tests/streams/streams_optimized_test.py | 150 ++++++++++++++++++++ 3 files changed, 332 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java new file mode 100644 index 0000000..6bde92f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Reducer; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.time.Duration.ofMillis; + +public class StreamsOptimizedTest { + + + public static void main(final String[] args) throws Exception { + if (args.length < 1) { + System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: "); + } + final String propFileName = args[0]; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + System.out.println("StreamsTest instance started StreamsOptimizedTest"); + System.out.println("props=" + streamsProperties); + + final String inputTopic = (String) Objects.requireNonNull(streamsProperties.remove("input.topic")); + final String aggregationTopic = (String) Objects.requireNonNull(streamsProperties.remove("aggregation.topic")); + final String reduceTopic = (String) Objects.requireNonNull(streamsProperties.remove("reduce.topic")); + final String joinTopic = (String) Objects.requireNonNull(streamsProperties.remove("join.topic")); + + + final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + final Initializer<Integer> initializer = () -> 0; + final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length(); + + final Reducer<String> reducer = (v1, v2) -> Integer.toString(Integer.parseInt(v1) + Integer.parseInt(v2)); + + final Function<String, String> keyFunction = s -> Integer.toString(Integer.parseInt(s) % 9); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); + + final KStream<String, String> mappedStream = sourceStream.selectKey((k, v) -> keyFunction.apply(v)); + + final KStream<String, Long> countStream = mappedStream.groupByKey() + .count(Materialized.with(Serdes.String(), + Serdes.Long())).toStream(); + + mappedStream.groupByKey().aggregate( + initializer, + aggregator, + Materialized.with(Serdes.String(), Serdes.Integer())) + .toStream() + .peek((k, v) -> System.out.println(String.format("AGGREGATED key=%s value=%s", k, v))) + .to(aggregationTopic, Produced.with(Serdes.String(), Serdes.Integer())); + + + mappedStream.groupByKey() + .reduce(reducer, Materialized.with(Serdes.String(), Serdes.String())) + .toStream() + .peek((k, v) -> System.out.println(String.format("REDUCED key=%s value=%s", k, v))) + .to(reduceTopic, Produced.with(Serdes.String(), Serdes.String())); + + mappedStream.join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), + JoinWindows.of(ofMillis(500)), + Joined.with(Serdes.String(), Serdes.String(), Serdes.Long())) + .peek((k, v) -> System.out.println(String.format("JOINED key=%s value=%s", k, v))) + .to(joinTopic, Produced.with(Serdes.String(), Serdes.String())); + + final Properties config = new Properties(); + + + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsOptimizedTest"); + config.setProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); + config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + + + config.putAll(streamsProperties); + + final Topology topology = builder.build(config); + final KafkaStreams streams = new KafkaStreams(topology, config); + + + streams.setStateListener((oldState, newState) -> { + if (oldState == State.REBALANCING && newState == State.RUNNING) { + final int repartitionTopicCount = getCountOfRepartitionTopicsFound(topology.describe().toString(), repartitionTopicPattern); + System.out.println(String.format("REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%d", repartitionTopicCount)); + System.out.flush(); + } + }); + + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(Duration.ofMillis(5000)); + System.out.println("OPTIMIZE_TEST Streams Stopped"); + System.out.flush(); + } + }); + + } + + private static int getCountOfRepartitionTopicsFound(final String topologyString, + final Pattern repartitionTopicPattern) { + final Matcher matcher = repartitionTopicPattern.matcher(topologyString); + final List<String> repartitionTopicsFound = new ArrayList<>(); + while (matcher.find()) { + final String repartitionTopic = matcher.group(); + System.out.println(String.format("REPARTITION TOPIC found -> %s", repartitionTopic)); + repartitionTopicsFound.add(repartitionTopic); + } + return repartitionTopicsFound.size(); + } +} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 67e6f02..fb43dfb 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -424,6 +424,32 @@ class StreamsStandbyTaskService(StreamsTestBaseService): configs) +class StreamsOptimizedUpgradeTestService(StreamsTestBaseService): + def __init__(self, test_context, kafka): + super(StreamsOptimizedUpgradeTestService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.tests.StreamsOptimizedTest", + "") + self.OPTIMIZED_CONFIG = 'none' + self.INPUT_TOPIC = None + self.AGGREGATION_TOPIC = None + self.REDUCE_TOPIC = None + self.JOIN_TOPIC = None + + def prop_file(self): + properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()} + + properties['topology.optimization'] = self.OPTIMIZED_CONFIG + properties['input.topic'] = self.INPUT_TOPIC + properties['aggregation.topic'] = self.AGGREGATION_TOPIC + properties['reduce.topic'] = self.REDUCE_TOPIC + properties['join.topic'] = self.JOIN_TOPIC + + cfg = KafkaConfig(**properties) + return cfg.render() + + class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): def __init__(self, test_context, kafka): super(StreamsUpgradeTestJobRunnerService, self).__init__(test_context, diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py new file mode 100644 index 0000000..e380aee --- /dev/null +++ b/tests/kafkatest/tests/streams/streams_optimized_test.py @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.kafka import KafkaService +from kafkatest.services.streams import StreamsOptimizedUpgradeTestService +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.zookeeper import ZookeeperService + + +class StreamsOptimizedTest(Test): + """ + Test doing upgrades of a Kafka Streams application + that is un-optimized initially then optimized + """ + + input_topic = 'inputTopic' + aggregation_topic = 'aggregationTopic' + reduce_topic = 'reduceTopic' + join_topic = 'joinTopic' + operation_pattern = 'AGGREGATED\|REDUCED\|JOINED' + + def __init__(self, test_context): + super(StreamsOptimizedTest, self).__init__(test_context) + self.topics = { + self.input_topic: {'partitions': 6}, + self.aggregation_topic: {'partitions': 6}, + self.reduce_topic: {'partitions': 6}, + self.join_topic: {'partitions': 6} + } + + self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=3, + zk=self.zookeeper, topics=self.topics) + + self.producer = VerifiableProducer(self.test_context, + 1, + self.kafka, + self.input_topic, + throughput=1000, + acks=1) + + def test_upgrade_optimized_topology(self): + self.zookeeper.start() + self.kafka.start() + + processor1 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka) + processor2 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka) + processor3 = StreamsOptimizedUpgradeTestService(self.test_context, self.kafka) + + processors = [processor1, processor2, processor3] + + # produce records continually during the test + self.producer.start() + + # start all processors unoptimized + for processor in processors: + self.set_topics(processor) + processor.CLEAN_NODE_ENABLED = False + self.verify_running_repartition_topic_count(processor, 4) + + self.verify_processing(processors, verify_individual_operations=False) + + self.stop_processors(processors) + + # start again with topology optimized + for processor in processors: + processor.OPTIMIZED_CONFIG = 'all' + self.verify_running_repartition_topic_count(processor, 1) + + self.verify_processing(processors, verify_individual_operations=True) + + self.stop_processors(processors) + + self.producer.stop() + self.kafka.stop() + self.zookeeper.stop() + + @staticmethod + def verify_running_repartition_topic_count(processor, repartition_topic_count): + node = processor.node + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.start() + monitor.wait_until('REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%s' % repartition_topic_count, + timeout_sec=60, + err_msg="Never saw 'REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%s' message " + % repartition_topic_count + str(processor.node.account)) + + @staticmethod + def verify_stopped(processor): + node = processor.node + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.stop() + monitor.wait_until('OPTIMIZE_TEST Streams Stopped', + timeout_sec=60, + err_msg="'OPTIMIZE_TEST Streams Stopped' message" + str(processor.node.account)) + + def verify_processing(self, processors, verify_individual_operations): + for processor in processors: + if not self.all_source_subtopology_tasks(processor): + if verify_individual_operations: + for operation in self.operation_pattern.split('\|'): + self.do_verify(processor, operation) + else: + self.do_verify(processor, self.operation_pattern) + else: + self.logger.info("Skipping processor %s with all source tasks" % processor.node.account) + + def do_verify(self, processor, pattern): + self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % pattern) + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + monitor.wait_until(pattern, + timeout_sec=60, + err_msg="Never saw processing of %s " % pattern + str(processor.node.account)) + + def all_source_subtopology_tasks(self, processor): + retries = 0 + while retries < 5: + found = list(processor.node.account.ssh_capture("sed -n 's/.*current active tasks: \[\(\(0_[0-9], \)\{3\}0_[0-9]\)\].*/\1/p' %s" % processor.LOG_FILE, allow_fail=True)) + self.logger.info("Returned %s from assigned task check" % found) + if len(found) > 0: + return True + retries += 1 + time.sleep(1) + + return False + + def stop_processors(self, processors): + for processor in processors: + self.verify_stopped(processor) + + def set_topics(self, processor): + processor.INPUT_TOPIC = self.input_topic + processor.AGGREGATION_TOPIC = self.aggregation_topic + processor.REDUCE_TOPIC = self.reduce_topic + processor.JOIN_TOPIC = self.join_topic