Repository: kafka
Updated Branches:
  refs/heads/0.10.0 2e078d9d8 -> dca78b586


KAFKA-3694; Ensure broker Zk deregistration prior to restart in ReplicationTest

Author: Jason Gustafson <[email protected]>

Reviewers: Geoff Anderson <[email protected]>, Ismael Juma <[email protected]>

Closes #1365 from hachikuji/KAFKA-3694

(cherry picked from commit f892f0ca6d38cb21a93c2c05dd8b9a23c4165181)
Signed-off-by: Ismael Juma <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dca78b58
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dca78b58
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dca78b58

Branch: refs/heads/0.10.0
Commit: dca78b586fd2561709793f163c7c3f05e194d768
Parents: 2e078d9
Author: Jason Gustafson <[email protected]>
Authored: Wed May 11 23:48:46 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Wed May 11 23:49:03 2016 +0100

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py             | 16 +++++++++++++++-
 .../services/kafka/templates/kafka.properties       |  1 +
 tests/kafkatest/tests/core/replication_test.py      | 11 ++++-------
 tests/kafkatest/tests/produce_consume_validate.py   |  2 +-
 4 files changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dca78b58/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 334069d..a843a12 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -66,7 +66,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAINTEXT, 
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=TRUNK, 
quota_config=None, jmx_object_names=None,
-                 jmx_attributes=[], zk_connect_timeout=5000):
+                 jmx_attributes=[], zk_connect_timeout=5000, 
zk_session_timeout=6000):
         """
         :type context
         :type zk: ZookeeperService
@@ -99,6 +99,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         # for this constructor.
         self.zk_connect_timeout = zk_connect_timeout
 
+        # Also allow the session timeout to be provided explicitly,
+        # primarily so that test cases can depend on it when waiting
+        # e.g. brokers to deregister after a hard kill.
+        self.zk_session_timeout = zk_session_timeout
+
         self.port_mappings = {
             'PLAINTEXT': Port('PLAINTEXT', 9092, False),
             'SSL': Port('SSL', 9093, False),
@@ -513,6 +518,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.logger.info("Controller's ID: %d" % (controller_idx))
         return self.get_node(controller_idx)
 
+    def is_registered(self, node):
+        """
+        Check whether a broker is registered in Zookeeper
+        """
+        self.logger.debug("Querying zookeeper to see if broker %s is 
registered", node)
+        broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node))
+        self.logger.debug("Broker info: %s", broker_info)
+        return broker_info is not None
+
     def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
         node = self.nodes[0]
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dca78b58/tests/kafkatest/services/kafka/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 1e4f17c..1f23713 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -72,6 +72,7 @@ zookeeper.set.acl={{zk_set_acl}}
 {% endif %}
 
 zookeeper.connection.timeout.ms={{ zk_connect_timeout }}
+zookeeper.session.timeout.ms={{ zk_session_timeout }}
 
 {% if replica_lag is defined %}
 replica.lag.time.max.ms={{replica_lag}}

http://git-wip-us.apache.org/repos/asf/kafka/blob/dca78b58/tests/kafkatest/tests/core/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/replication_test.py 
b/tests/kafkatest/tests/core/replication_test.py
index 8e9474a..f815034 100644
--- a/tests/kafkatest/tests/core/replication_test.py
+++ b/tests/kafkatest/tests/core/replication_test.py
@@ -65,15 +65,12 @@ def hard_bounce(test, broker_type):
         test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
 
         # Since this is a hard kill, we need to make sure the process is down 
and that
-        # zookeeper and the broker cluster have registered the loss of the 
leader/controller.
-        # Waiting for a new leader for the topic-partition/controller to be 
elected is a reasonable heuristic for this.
+        # zookeeper has registered the loss by expiring the broker's session 
timeout.
 
-        def role_reassigned():
-            current_elected_broker = broker_node(test, broker_type)
-            return current_elected_broker is not None and 
current_elected_broker != prev_broker_node
+        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not 
test.kafka.is_registered(prev_broker_node),
+                   timeout_sec=test.kafka.zk_session_timeout + 5,
+                   err_msg="Failed to see timely deregistration of hard-killed 
broker %s" % str(prev_broker_node.account))
 
-        wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, 
timeout_sec=5)
-        wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5)
         test.kafka.start_node(prev_broker_node)
 
 failures = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dca78b58/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py 
b/tests/kafkatest/tests/produce_consume_validate.py
index 425b816..a5da7be 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -35,7 +35,7 @@ class ProduceConsumeValidateTest(Test):
     def start_producer_and_consumer(self):
         # Start background producer and consumer
         self.producer.start()
-        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10,
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=20,
              err_msg="Producer failed to start in a reasonable amount of 
time.")
         self.consumer.start()
         wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, 
timeout_sec=60,

Reply via email to