[ 
https://issues.apache.org/jira/browse/KAFKA-6676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404613#comment-16404613
 ] 

ASF GitHub Bot commented on KAFKA-6676:
---------------------------------------

rajinisivaram closed pull request #4729: KAFKA-6676: Ensure Kafka chroot exists 
in system tests and use chroot on one test with security parameterizations
URL: https://github.com/apache/kafka/pull/4729
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py 
b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 066d6d42c14..537755d5820 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -36,7 +36,7 @@ def __init__(self, test_context):
 
         self.topic = "topic"
         self.zk = ZookeeperService(test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, 
zk_chroot="/kafka",
                                   topics={self.topic: {"partitions": 1, 
"replication-factor": 1}})
         self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, 
kafka=self.kafka, topic=self.topic, new_consumer=False)
 
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index e563ab82dab..c4d4b247557 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -163,6 +163,8 @@ def start(self, add_principals=""):
         self.open_port(self.interbroker_security_protocol)
 
         self.start_minikdc(add_principals)
+        self._ensure_zk_chroot()
+
         Service.start(self)
 
         self.logger.info("Waiting for brokers to register at ZK")
@@ -183,6 +185,16 @@ def start(self, add_principals=""):
                 topic_cfg["topic"] = topic
                 self.create_topic(topic_cfg)
 
+    def _ensure_zk_chroot(self):
+        self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
+        if self.zk_chroot:
+            if not self.zk_chroot.startswith('/'):
+                raise Exception("Zookeeper chroot must start with '/' but 
found " + self.zk_chroot)
+
+            parts = self.zk_chroot.split('/')[1:]
+            for i in range(len(parts)):
+                self.zk.create('/' + '/'.join(parts[:i+1]))
+
     def set_protocol_and_port(self, node):
         listeners = []
         advertised_listeners = []
diff --git a/tests/kafkatest/services/zookeeper.py 
b/tests/kafkatest/services/zookeeper.py
index b181a12210a..5bda867ed7c 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -103,7 +103,7 @@ def stop_node(self, node):
         idx = self.idx(node)
         self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, 
idx, node.account.hostname))
         node.account.kill_java_processes(self.java_class_name(), 
allow_fail=False)
-        node.account.kill_java_processes(self.java_query_class_name(), 
allow_fail=False)
+        node.account.kill_java_processes(self.java_cli_class_name(), 
allow_fail=False)
         wait_until(lambda: not self.alive(node), timeout_sec=5, err_msg="Timed 
out waiting for zookeeper to stop.")
 
     def clean_node(self, node):
@@ -113,7 +113,7 @@ def clean_node(self, node):
                              (self.__class__.__name__, node.account))
         node.account.kill_java_processes(self.java_class_name(),
                                          clean_shutdown=False, allow_fail=True)
-        node.account.kill_java_processes(self.java_query_class_name(),
+        node.account.kill_java_processes(self.java_cli_class_name(),
                                          clean_shutdown=False, 
allow_fail=False)
         node.account.ssh("rm -rf -- %s" % ZookeeperService.ROOT, 
allow_fail=False)
 
@@ -134,18 +134,21 @@ def zookeeper_migration(self, node, zk_acl):
                        (self.path.script("zookeeper-security-migration.sh", 
node), zk_acl, self.connect_setting())
         node.account.ssh(la_migra_cmd)
 
+    def _check_chroot(self, chroot):
+        if chroot and not chroot.startswith("/"):
+            raise Exception("ZK chroot must start with '/', invalid chroot: 
%s" % chroot)
+
     def query(self, path, chroot=None):
         """
         Queries zookeeper for data associated with 'path' and returns all 
fields in the schema
         """
-        if chroot and not chroot.startswith("/"):
-            raise Exception("ZK chroot must start with '/', invalid chroot: 
%s" % chroot)
+        self._check_chroot(chroot)
 
         chroot_path = ('' if chroot is None else chroot) + path
 
         kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
         cmd = "%s %s -server %s get %s" % \
-              (kafka_run_class, self.java_query_class_name(), 
self.connect_setting(), chroot_path)
+              (kafka_run_class, self.java_cli_class_name(), 
self.connect_setting(), chroot_path)
         self.logger.debug(cmd)
 
         node = self.nodes[0]
@@ -158,10 +161,25 @@ def query(self, path, chroot=None):
                     result = match.groups()[0]
         return result
 
+    def create(self, path, chroot=None):
+        """
+        Create an znode at the given path
+        """
+        self._check_chroot(chroot)
+
+        chroot_path = ('' if chroot is None else chroot) + path
+
+        kafka_run_class = self.path.script("kafka-run-class.sh", DEV_BRANCH)
+        cmd = "%s %s -server %s create %s ''" % \
+              (kafka_run_class, self.java_cli_class_name(), 
self.connect_setting(), chroot_path)
+        self.logger.debug(cmd)
+        output = self.nodes[0].account.ssh_output(cmd)
+        self.logger.debug(output)
+
     def java_class_name(self):
         """ The class name of the Zookeeper quorum peers. """
         return "org.apache.zookeeper.server.quorum.QuorumPeerMain"
 
-    def java_query_class_name(self):
+    def java_cli_class_name(self):
         """ The class name of the Zookeeper tool within Kafka. """
         return "kafka.tools.ZooKeeperMainWrapper"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> System tests do not handle ZK chroot properly with SCRAM
> --------------------------------------------------------
>
>                 Key: KAFKA-6676
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6676
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>            Priority: Major
>
> This is related to the issue observed inĀ KAFKA-6672. There, we are now 
> automatically creating parent nodes if they do not exist. However, if using a 
> chroot within ZK and that chroot does not yet exist, you get an error message 
> about "Path length must be > 0" as it tries to create all the parent paths.
> It would probably be better to be able to detect this issue and account for 
> it, but currently system test code will fail if you use SCRAM and a chroot 
> because while Kafka will create the chroot when it starts up, there are some 
> commands related to security that may need to be executed before that and 
> assume the chroot will already be there.
> We're currently missing this because while the chroot option is there, 
> nothing in Kafka's tests are currently exercising it. So given what is 
> apparently a common assumption in tools that the chroot already exists (since 
> I think the core kafka server is the only thing that handles creating it if 
> needed), I think the fix here would be two-fold:
>  # Make KafkaService ensure the chroot exists before running any commands 
> that might need it.
>  # On at least one test that exercises security support, use a zk_chroot so 
> that functionality is at least reasonably well exercised.
> It would be good to have this in both trunk and 1.1 branches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to