This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push: new 325816b MINOR: fix Streams version-probing system test (#6764) 325816b is described below commit 325816b534b016b9f2262a8b6f2792836f0659b5 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Fri May 24 08:08:52 2019 -0700 MINOR: fix Streams version-probing system test (#6764) Reviewers: John Roesler <j...@confluent.io>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <guozh...@confluent.io>, Boyang Chen <boy...@confluent.io> --- .../kafka/streams/tests/StreamsUpgradeTest.java | 20 ++++++++------------ .../kafkatest/tests/streams/streams_upgrade_test.py | 7 +++---- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 33e9b97..27bee81 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -47,7 +47,6 @@ import java.io.IOException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -90,16 +89,13 @@ public class StreamsUpgradeTest { final KafkaStreams streams = new KafkaStreams(builder.build(), config, kafkaClientSupplier); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.out.println("closing Kafka Streams instance"); - System.out.flush(); - streams.close(); - System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); - System.out.flush(); - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + })); } private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier { @@ -168,7 +164,7 @@ public class StreamsUpgradeTest { assignment.userData().putInt(0, AssignmentInfo.LATEST_SUPPORTED_VERSION)); final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions()); - Collections.sort(partitions, PARTITION_COMPARATOR); + partitions.sort(PARTITION_COMPARATOR); // version 1 field final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 8b62977..4cb8bb6 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -70,6 +70,7 @@ which are outlined here: """ + class StreamsUpgradeTest(Test): """ Test upgrading Kafka Streams (all version combination) @@ -348,7 +349,7 @@ class StreamsUpgradeTest(Test): while retries > 0: for p in self.processors: found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True)) - if len(found) == self.leader_counter[p] + 1: + if len(found) >= self.leader_counter[p] + 1: if self.leader is not None: raise Exception("Could not uniquely identify leader") self.leader = p @@ -403,8 +404,7 @@ class StreamsUpgradeTest(Test): timeout_sec=60, err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) - - # start third with <version> + # start third with <version> self.prepare_for(self.processor3, version) node3 = self.processor3.node with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: @@ -425,7 +425,6 @@ class StreamsUpgradeTest(Test): timeout_sec=60, err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account)) - @staticmethod def prepare_for(processor, version): processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)