Repository: kafka Updated Branches: refs/heads/trunk 91517e8fb -> 51063441d
KAFKA-5362; Follow up to Streams EOS system test - improve tests to get rid of calls to `sleep` in Python - fixed some flaky test conditions - improve debugging Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy <[email protected]>, Bill Bejeck <[email protected]>, Guozhang Wang <[email protected]> Closes #3542 from mjsax/failing-eos-system-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51063441 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51063441 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51063441 Branch: refs/heads/trunk Commit: 51063441d3ed4dec8a96794f085028b4b8feb20c Parents: 91517e8 Author: Matthias J. Sax <[email protected]> Authored: Fri Oct 6 17:48:34 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Oct 6 17:48:34 2017 -0700 ---------------------------------------------------------------------- .../kafka/streams/tests/EosTestClient.java | 19 ++- .../kafka/streams/tests/EosTestDriver.java | 77 +++++++++-- .../kafka/streams/tests/SmokeTestUtil.java | 1 + .../kafka/streams/tests/StreamsEosTest.java | 3 + tests/kafkatest/services/streams.py | 2 +- .../kafkatest/tests/streams/streams_eos_test.py | 133 ++++++++++++------- 6 files changed, 166 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index 751fc97..098b77b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -53,10 +53,12 @@ public class EosTestClient extends SmokeTestUtil { @Override public void run() { isRunning = false; - streams.close(5, TimeUnit.SECONDS); + streams.close(TimeUnit.SECONDS.toMillis(300), TimeUnit.SECONDS); // do not remove these printouts since they are needed for health scripts if (!uncaughtException) { + System.out.println(System.currentTimeMillis()); System.out.println("EOS-TEST-CLIENT-CLOSED"); + System.out.flush(); } } })); @@ -69,15 +71,26 @@ public class EosTestClient extends SmokeTestUtil { streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(final Thread t, final Throwable e) { + System.out.println(System.currentTimeMillis()); System.out.println("EOS-TEST-CLIENT-EXCEPTION"); e.printStackTrace(); + System.out.flush(); uncaughtException = true; } }); + streams.setStateListener(new KafkaStreams.StateListener() { + @Override + public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) { + // don't remove this -- it's required test output + System.out.println(System.currentTimeMillis()); + System.out.println("StateChange: " + oldState + " -> " + newState); + System.out.flush(); + } + }); streams.start(); } if (uncaughtException) { - streams.close(5, TimeUnit.SECONDS); + streams.close(TimeUnit.SECONDS.toMillis(60), TimeUnit.SECONDS); streams = null; } sleep(1000); @@ -90,7 +103,7 @@ public class EosTestClient extends SmokeTestUtil { props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 0c1e16b..7c7485d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -54,15 +55,23 @@ import java.util.Set; public class EosTestDriver extends SmokeTestUtil { private static final int MAX_NUMBER_OF_KEYS = 100; - private static final long MAX_IDLE_TIME_MS = 300000L; + private static final long MAX_IDLE_TIME_MS = 600000L; private static boolean isRunning = true; + static int numRecordsProduced = 0; + + static synchronized void updateNumRecordsProduces(final int delta) { + numRecordsProduced += delta; + } + static void generate(final String kafka) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { + System.out.println("Terminating"); + System.out.flush(); isRunning = false; } }); @@ -78,7 +87,6 @@ public class EosTestDriver extends SmokeTestUtil { final Random rand = new Random(System.currentTimeMillis()); - int numRecordsProduced = 0; while (isRunning) { final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); final int value = rand.nextInt(10000); @@ -89,20 +97,47 @@ public class EosTestDriver extends SmokeTestUtil { @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { if (exception != null) { - exception.printStackTrace(); - Exit.exit(1); + exception.printStackTrace(System.err); + System.err.flush(); + if (exception instanceof TimeoutException) { + try { + // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time + final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); + updateNumRecordsProduces(-expired); + } catch (Exception ignore) { } + } } } }); - numRecordsProduced++; + updateNumRecordsProduces(1); if (numRecordsProduced % 1000 == 0) { System.out.println(numRecordsProduced + " records produced"); + System.out.flush(); } - Utils.sleep(rand.nextInt(50)); + Utils.sleep(rand.nextInt(10)); } producer.close(); - System.out.println(numRecordsProduced + " records produced"); + System.out.println("Producer closed: " + numRecordsProduced + " records produced"); + + final Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + + try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) { + final List<TopicPartition> partitions = getAllPartitions(consumer, "data"); + System.out.println("Partitions: " + partitions); + consumer.assign(partitions); + consumer.seekToEnd(partitions); + + for (final TopicPartition tp : partitions) { + System.out.println("End-offset for " + tp + " is " + consumer.position(tp)); + } + } + System.out.flush(); } public static void verify(final String kafka, final boolean withRepartitioning) { @@ -180,6 +215,7 @@ public class EosTestDriver extends SmokeTestUtil { // do not modify: required test output System.out.println("ALL-RECORDS-DELIVERED"); + System.out.flush(); } private static void ensureStreamsApplicationDown(final String kafka) { @@ -190,7 +226,7 @@ public class EosTestDriver extends SmokeTestUtil { final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) { if (System.currentTimeMillis() > maxWaitTime) { - throw new RuntimeException("Streams application not down after 30 seconds."); + throw new RuntimeException("Streams application not down after " + (MAX_IDLE_TIME_MS / 1000) + " seconds."); } sleep(1000); } @@ -240,16 +276,20 @@ public class EosTestDriver extends SmokeTestUtil { final Map<TopicPartition, Long> readEndOffsets, final boolean withRepartitioning, final boolean isInputTopic) { + System.err.println("read end offset: " + readEndOffsets); final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>(); + final Map<TopicPartition, Long> maxReceivedOffsetPerPartition = new HashMap<>(); + final Map<TopicPartition, Long> maxConsumerPositionPerPartition = new HashMap<>(); long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; boolean allRecordsReceived = false; while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) { - final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500); + final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100); for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) { maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + maxReceivedOffsetPerPartition.put(tp, record.offset()); final long readEndOffset = readEndOffsets.get(tp); if (record.offset() < readEndOffset) { addRecord(record, recordPerTopicPerPartition, withRepartitioning); @@ -257,7 +297,11 @@ public class EosTestDriver extends SmokeTestUtil { throw new RuntimeException("FAIL: did receive more records than expected for " + tp + " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset()); } - if (consumer.position(tp) >= readEndOffset) { + } + + for (final TopicPartition tp : readEndOffsets.keySet()) { + maxConsumerPositionPerPartition.put(tp, consumer.position(tp)); + if (consumer.position(tp) >= readEndOffsets.get(tp)) { consumer.pause(Collections.singletonList(tp)); } } @@ -266,7 +310,10 @@ public class EosTestDriver extends SmokeTestUtil { } if (!allRecordsReceived) { - throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time."); + System.err.println("Pause partitions (ie, received all data): " + consumer.paused()); + System.err.println("Max received offset per partition: " + maxReceivedOffsetPerPartition); + System.err.println("Max consumer position per partition: " + maxConsumerPositionPerPartition); + throw new RuntimeException("FAIL: did not receive all records after " + (MAX_IDLE_TIME_MS / 1000) + " sec idle time."); } return recordPerTopicPerPartition; @@ -530,7 +577,8 @@ public class EosTestDriver extends SmokeTestUtil { @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { if (exception != null) { - exception.printStackTrace(); + exception.printStackTrace(System.err); + System.err.flush(); Exit.exit(1); } } @@ -540,10 +588,11 @@ public class EosTestDriver extends SmokeTestUtil { final StringDeserializer stringDeserializer = new StringDeserializer(); - final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; + long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); for (final ConsumerRecord<byte[], byte[]> record : records) { + maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; final String topic = record.topic(); final TopicPartition tp = new TopicPartition(topic, record.partition()); @@ -564,7 +613,7 @@ public class EosTestDriver extends SmokeTestUtil { } } if (!partitions.isEmpty()) { - throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 30 sec."); + throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000) + " sec."); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java index fc808e7..dc4c91b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java @@ -60,6 +60,7 @@ public class SmokeTestUtil { } numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { + System.out.println(System.currentTimeMillis()); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java index 27fdc2d..4921143 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java @@ -33,6 +33,7 @@ public class StreamsEosTest { System.out.println("kafka=" + kafka); System.out.println("stateDir=" + stateDir); System.out.println("command=" + command); + System.out.flush(); if (command == null || stateDir == null) { System.exit(-1); @@ -56,6 +57,8 @@ public class StreamsEosTest { break; default: System.out.println("unknown command: " + command); + System.out.flush(); + System.exit(-1); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/tests/kafkatest/services/streams.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index a0d9c57..3719feb 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -135,7 +135,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): self.logger.info("Starting StreamsTest process on " + str(node.account)) with node.account.monitor_log(self.STDOUT_FILE) as monitor: node.account.ssh(self.start_cmd(node)) - monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account)) + monitor.wait_until('StreamsTest instance started', timeout_sec=60, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account)) if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") http://git-wip-us.apache.org/repos/asf/kafka/blob/51063441/tests/kafkatest/tests/streams/streams_eos_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py index 60dc4b1..0863e25 100644 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ b/tests/kafkatest/tests/streams/streams_eos_test.py @@ -19,7 +19,6 @@ from ducktape.mark import ignore from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \ StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService -import time class StreamsEosTest(KafkaTest): @@ -29,106 +28,138 @@ class StreamsEosTest(KafkaTest): def __init__(self, test_context): super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ - 'data' : { 'partitions': 5, 'replication-factor': 2 }, - 'echo' : { 'partitions': 5, 'replication-factor': 2 }, - 'min' : { 'partitions': 5, 'replication-factor': 2 }, - 'sum' : { 'partitions': 5, 'replication-factor': 2 }, - 'repartition' : { 'partitions': 5, 'replication-factor': 2 }, - 'max' : { 'partitions': 5, 'replication-factor': 2 }, - 'cnt' : { 'partitions': 5, 'replication-factor': 2 } + 'data': {'partitions': 5, 'replication-factor': 2}, + 'echo': {'partitions': 5, 'replication-factor': 2}, + 'min': {'partitions': 5, 'replication-factor': 2}, + 'sum': {'partitions': 5, 'replication-factor': 2}, + 'repartition': {'partitions': 5, 'replication-factor': 2}, + 'max': {'partitions': 5, 'replication-factor': 2}, + 'cnt': {'partitions': 5, 'replication-factor': 2} }) self.driver = StreamsEosTestDriverService(test_context, self.kafka) self.test_context = test_context - @ignore - @cluster(num_nodes=8) + @cluster(num_nodes=9) def test_rebalance_simple(self): self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestJobRunnerService(self.test_context, self.kafka), + StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) - @ignore - @cluster(num_nodes=8) + @cluster(num_nodes=9) def test_rebalance_complex(self): self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) - def run_rebalance(self, processor1, processor2, verifier): + def run_rebalance(self, processor1, processor2, processor3, verifier): """ Starts and stops two test clients a few times. Ensure that all records are delivered exactly-once. """ self.driver.start() - processor1.start() - - time.sleep(120) - - processor2.start() - time.sleep(120) - processor1.stop() - - time.sleep(120) - processor1.start() - - time.sleep(120) - processor2.stop() - - time.sleep(120) + self.add_streams(processor1) + self.add_streams2(processor1, processor2) + self.add_streams3(processor1, processor2, processor3) + self.stop_streams3(processor2, processor3, processor1) + self.add_streams3(processor2, processor3, processor1) + self.stop_streams3(processor1, processor3, processor2) + self.stop_streams2(processor1, processor3) + self.stop_streams(processor1) self.driver.stop() - processor1.stop() - processor2.stop() - verifier.start() verifier.wait() verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) - @ignore - @cluster(num_nodes=8) + @cluster(num_nodes=9) def test_failure_and_recovery(self): self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestJobRunnerService(self.test_context, self.kafka), + StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) - @ignore - @cluster(num_nodes=8) + @cluster(num_nodes=9) def test_failure_and_recovery_complex(self): self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) - def run_failure_and_recovery(self, processor1, processor2, verifier): + def run_failure_and_recovery(self, processor1, processor2, processor3, verifier): """ Starts two test clients, then abort (kill -9) and restart them a few times. Ensure that all records are delivered exactly-once. """ self.driver.start() - processor1.start() - processor2.start() - - time.sleep(120) - processor1.abortThenRestart() - - time.sleep(120) - processor1.abortThenRestart() - time.sleep(120) - processor2.abortThenRestart() - - time.sleep(120) + self.add_streams(processor1) + self.add_streams2(processor1, processor2) + self.add_streams3(processor1, processor2, processor3) + self.abort_streams(processor2, processor3, processor1) + self.add_streams3(processor2, processor3, processor1) + self.abort_streams(processor2, processor3, processor1) + self.add_streams3(processor2, processor3, processor1) + self.abort_streams(processor1, processor3, processor2) + self.stop_streams2(processor1, processor3) + self.stop_streams(processor1) self.driver.stop() - processor1.stop() - processor2.stop() - verifier.start() verifier.wait() verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) + + def add_streams(self, processor): + processor.start() + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: + self.wait_for_startup(monitor, processor) + + def add_streams2(self, running_processor, processor_to_be_started): + with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor: + self.add_streams(processor_to_be_started) + self.wait_for_startup(monitor, running_processor) + + def add_streams3(self, running_processor1, running_processor2, processor_to_be_started): + with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor: + self.add_streams2(running_processor2, processor_to_be_started) + self.wait_for_startup(monitor, running_processor1) + + def stop_streams(self, processor_to_be_stopped): + with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2: + processor_to_be_stopped.stop() + self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING") + + def stop_streams2(self, keep_alive_processor, processor_to_be_stopped): + with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor: + self.stop_streams(processor_to_be_stopped) + self.wait_for_startup(monitor, keep_alive_processor) + + def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped): + with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor: + self.stop_streams2(keep_alive_processor2, processor_to_be_stopped) + self.wait_for_startup(monitor, keep_alive_processor1) + + def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted): + with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1: + with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2: + processor_to_be_aborted.stop_nodes(False) + self.wait_for_startup(monitor2, keep_alive_processor2) + self.wait_for_startup(monitor1, keep_alive_processor1) + + def wait_for_startup(self, monitor, processor): + self.wait_for(monitor, processor, "StateChange: RUNNING -> REBALANCING") + self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") + self.wait_for(monitor, processor, "processed 500 records from topic=data") + + def wait_for(self, monitor, processor, output): + monitor.wait_until(output, + timeout_sec=300, + err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))
