This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new c46a07f MINOR: Fixes for broker down test stability 2.1 (#6042) c46a07f is described below commit c46a07f1464acda109d6d4d0f63f52bd29f2a7f7 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Wed Dec 19 20:52:01 2018 -0500 MINOR: Fixes for broker down test stability 2.1 (#6042) This PR addresses a few issues with this system test flakiness. This PR is a cherry-picked duplicate of #6041 but for the 2.1 branch, hence I won't repeat the inline comments here. 1. Need to grab the monitor before a given operation to observe logs for signal 2. Relied too much on a timely rebalance and only sent a handful of messages. I've updated the test and ran it here https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2142/ parameterized for 15 repeats all passed. Reviewers: Guozhang Wang <wangg...@gmail.com> --- tests/kafkatest/tests/streams/base_streams_test.py | 1 + .../streams/streams_broker_down_resilience_test.py | 222 +++++++++++++++------ 2 files changed, 158 insertions(+), 65 deletions(-) diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py index 320d4b2..6e005dd 100644 --- a/tests/kafkatest/tests/streams/base_streams_test.py +++ b/tests/kafkatest/tests/streams/base_streams_test.py @@ -45,6 +45,7 @@ class BaseStreamsTest(KafkaTest): topic, max_messages=num_messages, acks=1, + throughput=1000, repeating_keys=repeating_keys) def assert_produce_consume(self, diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py index 3cbf713..ee5feae 100644 --- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py @@ -27,7 +27,9 @@ class StreamsBrokerDownResilience(BaseStreamsTest): inputTopic = "streamsResilienceSource" outputTopic = "streamsResilienceSink" client_id = "streams-broker-resilience-verify-consumer" - num_messages = 5 + num_messages = 10000 + message = "processed[0-9]*messages" + connected_message = "Discovered group coordinator" def __init__(self, test_context): super(StreamsBrokerDownResilience, self).__init__(test_context, @@ -48,8 +50,6 @@ class StreamsBrokerDownResilience(BaseStreamsTest): processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs()) processor.start() - # until KIP-91 is merged we'll only send 5 messages to assert Kafka Streams is running before taking the broker down - # After KIP-91 is merged we'll continue to send messages the duration of the test self.assert_produce_consume(self.inputTopic, self.outputTopic, self.client_id, @@ -61,7 +61,11 @@ class StreamsBrokerDownResilience(BaseStreamsTest): time.sleep(broker_down_time_in_seconds) - self.kafka.start_node(node) + with processor.node.account.monitor_log(processor.LOG_FILE) as monitor: + self.kafka.start_node(node) + monitor.wait_until(self.connected_message, + timeout_sec=120, + err_msg=("Never saw output '%s' on " % self.connected_message) + str(processor.node.account)) self.assert_produce_consume(self.inputTopic, self.outputTopic, @@ -95,22 +99,45 @@ class StreamsBrokerDownResilience(BaseStreamsTest): self.wait_for_verification(processor_2, broker_unavailable_message, processor_2.LOG_FILE, 10) self.wait_for_verification(processor_3, broker_unavailable_message, processor_3.LOG_FILE, 10) - # now start broker - self.kafka.start_node(node) - - # assert streams can process when starting with broker down - self.assert_produce_consume(self.inputTopic, - self.outputTopic, - self.client_id, - "running_with_broker_down_initially", - num_messages=9, - timeout_sec=120) - - message = "processed3messages" - # need to show all 3 instances processed messages - self.wait_for_verification(processor, message, processor.STDOUT_FILE) - self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE) - self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE) + with processor.node.account.monitor_log(processor.LOG_FILE) as monitor_1: + with processor_2.node.account.monitor_log(processor_2.LOG_FILE) as monitor_2: + with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor_3: + self.kafka.start_node(node) + + monitor_1.wait_until(self.connected_message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.connected_message) + str(processor.node.account)) + monitor_2.wait_until(self.connected_message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_2.node.account)) + monitor_3.wait_until(self.connected_message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_3.node.account)) + + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor_1: + with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: + with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + + self.assert_produce(self.inputTopic, + "sending_message_after_broker_down_initially", + num_messages=self.num_messages, + timeout_sec=120) + + monitor_1.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor.node.account)) + monitor_2.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor_2.node.account)) + monitor_3.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor_3.node.account)) + + self.assert_consume(self.client_id, + "consuming_message_after_broker_down_initially", + self.outputTopic, + num_messages=self.num_messages, + timeout_sec=120) self.kafka.stop() @@ -126,24 +153,40 @@ class StreamsBrokerDownResilience(BaseStreamsTest): processor_2.start() processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) - processor_3.start() # need to wait for rebalance once - self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE) - - # assert streams can process when starting with broker up - self.assert_produce_consume(self.inputTopic, - self.outputTopic, - self.client_id, - "waiting for rebalance to complete", - num_messages=9, - timeout_sec=120) - - message = "processed3messages" - - self.wait_for_verification(processor, message, processor.STDOUT_FILE) - self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE) - self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE) + rebalance = "State transition from REBALANCING to RUNNING" + with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor: + processor_3.start() + + monitor.wait_until(rebalance, + timeout_sec=120, + err_msg=("Never saw output '%s' on " % rebalance) + str(processor_3.node.account)) + + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor_1: + with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: + with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + + self.assert_produce(self.inputTopic, + "sending_message_normal_broker_start", + num_messages=self.num_messages, + timeout_sec=120) + + monitor_1.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor.node.account)) + monitor_2.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor_2.node.account)) + monitor_3.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor_3.node.account)) + + self.assert_consume(self.client_id, + "consuming_message_normal_broker_start", + self.outputTopic, + num_messages=self.num_messages, + timeout_sec=120) node = self.kafka.leader(self.inputTopic) self.kafka.stop_node(node) @@ -155,17 +198,20 @@ class StreamsBrokerDownResilience(BaseStreamsTest): self.wait_for_verification(processor, shutdown_message, processor.STDOUT_FILE) self.wait_for_verification(processor_2, shutdown_message, processor_2.STDOUT_FILE) - self.kafka.start_node(node) + with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor_3: + self.kafka.start_node(node) + + monitor_3.wait_until(self.connected_message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_3.node.account)) self.assert_produce_consume(self.inputTopic, self.outputTopic, self.client_id, "sending_message_after_stopping_streams_instance_bouncing_broker", - num_messages=9, + num_messages=self.num_messages, timeout_sec=120) - self.wait_for_verification(processor_3, "processed9messages", processor_3.STDOUT_FILE) - self.kafka.stop() def test_streams_should_failover_while_brokers_down(self): @@ -180,24 +226,40 @@ class StreamsBrokerDownResilience(BaseStreamsTest): processor_2.start() processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) - processor_3.start() # need to wait for rebalance once - self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE) - - # assert streams can process when starting with broker up - self.assert_produce_consume(self.inputTopic, - self.outputTopic, - self.client_id, - "waiting for rebalance to complete", - num_messages=9, - timeout_sec=120) - - message = "processed3messages" - - self.wait_for_verification(processor, message, processor.STDOUT_FILE) - self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE) - self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE) + rebalance = "State transition from REBALANCING to RUNNING" + with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor: + processor_3.start() + + monitor.wait_until(rebalance, + timeout_sec=120, + err_msg=("Never saw output '%s' on " % rebalance) + str(processor_3.node.account)) + + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor_1: + with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: + with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + + self.assert_produce(self.inputTopic, + "sending_message_after_normal_broker_start", + num_messages=self.num_messages, + timeout_sec=120) + + monitor_1.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor.node.account)) + monitor_2.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor_2.node.account)) + monitor_3.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor_3.node.account)) + + self.assert_consume(self.client_id, + "consuming_message_after_normal_broker_start", + self.outputTopic, + num_messages=self.num_messages, + timeout_sec=120) node = self.kafka.leader(self.inputTopic) self.kafka.stop_node(node) @@ -206,13 +268,43 @@ class StreamsBrokerDownResilience(BaseStreamsTest): processor_2.abortThenRestart() processor_3.abortThenRestart() - self.kafka.start_node(node) - - self.assert_produce_consume(self.inputTopic, - self.outputTopic, - self.client_id, - "sending_message_after_hard_bouncing_streams_instance_bouncing_broker", - num_messages=9, - timeout_sec=120) - + with processor.node.account.monitor_log(processor.LOG_FILE) as monitor_1: + with processor_2.node.account.monitor_log(processor_2.LOG_FILE) as monitor_2: + with processor_3.node.account.monitor_log(processor_3.LOG_FILE) as monitor_3: + self.kafka.start_node(node) + + monitor_1.wait_until(self.connected_message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.connected_message) + str(processor.node.account)) + monitor_2.wait_until(self.connected_message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_2.node.account)) + monitor_3.wait_until(self.connected_message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.connected_message) + str(processor_3.node.account)) + + with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor_1: + with processor_2.node.account.monitor_log(processor_2.STDOUT_FILE) as monitor_2: + with processor_3.node.account.monitor_log(processor_3.STDOUT_FILE) as monitor_3: + + self.assert_produce(self.inputTopic, + "sending_message_after_hard_bouncing_streams_instance_bouncing_broker", + num_messages=self.num_messages, + timeout_sec=120) + + monitor_1.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor.node.account)) + monitor_2.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor_2.node.account)) + monitor_3.wait_until(self.message, + timeout_sec=120, + err_msg=("Never saw '%s' on " % self.message) + str(processor_3.node.account)) + + self.assert_consume(self.client_id, + "consuming_message_after_stopping_streams_instance_bouncing_broker", + self.outputTopic, + num_messages=self.num_messages, + timeout_sec=120) self.kafka.stop()