This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new d96c7ea  KAFKA-7834: Extend collected logs in system test services to 
include heap dumps
d96c7ea is described below

commit d96c7eae0b48bb222f08771848a4e5f9df7a6f73
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Mon Feb 4 16:46:03 2019 -0800

    KAFKA-7834: Extend collected logs in system test services to include heap 
dumps
    
    * Enable heap dumps on OOM with -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=<file.bin> in the major services in system tests
    * Collect the heap dump from the predefined location as part of the result 
logs for each service
    * Change Connect service to delete the whole root directory instead of 
individual expected files
    * Tested by running the full suite of system tests
    
    Author: Konstantine Karantasis <konstant...@confluent.io>
    
    Reviewers: Ewen Cheslack-Postava <e...@confluent.io>
    
    Closes #6158 from kkonstantine/KAFKA-7834
    
    (cherry picked from commit 83c435f3babec485cf2091532191fe5420c27820)
    Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org>
---
 tests/kafkatest/services/connect.py     | 26 ++++++++++++++++++++++----
 tests/kafkatest/services/kafka/kafka.py | 11 +++++++++--
 tests/kafkatest/services/zookeeper.py   | 10 ++++++++--
 3 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/services/connect.py 
b/tests/kafkatest/services/connect.py
index bf38e50..40c2cf3 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -42,6 +42,7 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     PID_FILE = os.path.join(PERSISTENT_ROOT, "connect.pid")
     EXTERNAL_CONFIGS_FILE = os.path.join(PERSISTENT_ROOT, 
"connect-external-configs.properties")
     CONNECT_REST_PORT = 8083
+    HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "connect_heap_dump.bin")
 
     # Currently the Connect worker supports waiting on three modes:
     STARTUP_MODE_INSTANT = 'INSTANT'
@@ -61,6 +62,9 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
         "connect_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "connect_heap_dump_file": {
+            "path": HEAP_DUMP_FILE,
+            "collect_default": True}
     }
 
     def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec = 
60):
@@ -160,8 +164,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def clean_node(self, node):
         node.account.kill_process("connect", clean_shutdown=False, 
allow_fail=True)
         self.security_config.clean_node(node)
-        all_files = " ".join([self.CONFIG_FILE, self.LOG4J_CONFIG_FILE, 
self.PID_FILE, self.LOG_FILE, self.STDOUT_FILE, self.STDERR_FILE, 
self.EXTERNAL_CONFIGS_FILE] + self.config_filenames() + self.files)
-        node.account.ssh("rm -rf " + all_files, allow_fail=False)
+        other_files = " ".join(self.config_filenames() + self.files)
+        node.account.ssh("rm -rf -- %s %s" % 
(ConnectServiceBase.PERSISTENT_ROOT, other_files), allow_fail=False)
 
     def config_filenames(self):
         return [os.path.join(self.PERSISTENT_ROOT, "connect-connector-" + 
str(idx) + ".properties") for idx, template in 
enumerate(self.connector_config_templates or [])]
@@ -252,6 +256,14 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
     def _base_url(self, node):
         return 'http://' + node.account.externally_routable_ip + ':' + 
str(self.CONNECT_REST_PORT)
 
+    def append_to_environment_variable(self, envvar, value):
+        env_opts = self.environment[envvar]
+        if env_opts is None:
+            env_opts = "\"%s\"" % value
+        else:
+            env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
+        self.environment[envvar] = env_opts
+
 
 class ConnectStandaloneService(ConnectServiceBase):
     """Runs Kafka Connect in standalone mode."""
@@ -266,7 +278,10 @@ class ConnectStandaloneService(ConnectServiceBase):
 
     def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " 
% self.LOG4J_CONFIG_FILE
-        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+        heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=%s" % \
+                          self.logs["connect_heap_dump_file"]["path"]
+        other_kafka_opts = self.security_config.kafka_opts.strip('\"')
+        cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, 
other_kafka_opts)
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
         cmd += "%s %s " % (self.path.script("connect-standalone.sh", node), 
self.CONFIG_FILE)
@@ -314,7 +329,10 @@ class ConnectDistributedService(ConnectServiceBase):
     # connector_configs argument is intentionally ignored in distributed 
service.
     def start_cmd(self, node, connector_configs):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " 
% self.LOG4J_CONFIG_FILE
-        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+        heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=%s" % \
+                          self.logs["connect_heap_dump_file"]["path"]
+        other_kafka_opts = self.security_config.kafka_opts.strip('\"')
+        cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, 
other_kafka_opts)
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
         cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), 
self.CONFIG_FILE)
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 0e05147..3d198b1 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -49,6 +49,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
     # Kafka Authorizer
     SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
+    HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
 
     logs = {
         "kafka_server_start_stdout_stderr": {
@@ -65,7 +66,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             "collect_default": False},
         "kafka_data_2": {
             "path": DATA_LOG_DIR_2,
-            "collect_default": False}
+            "collect_default": False},
+        "kafka_heap_dump_file": {
+            "path": HEAP_DUMP_FILE,
+            "collect_default": True}
     }
 
     def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAINTEXT, 
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
@@ -229,7 +233,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     def start_cmd(self, node):
         cmd = "export JMX_PORT=%d; " % self.jmx_port
         cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % 
self.LOG4J_CONFIG
-        cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
+        heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=%s" % \
+                          self.logs["kafka_heap_dump_file"]["path"]
+        other_kafka_opts = self.security_config.kafka_opts.strip('\"')
+        cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, 
other_kafka_opts)
         cmd += "%s %s 1>> %s 2>> %s &" % \
                (self.path.script("kafka-server-start.sh", node),
                 KafkaService.CONFIG_FILE,
diff --git a/tests/kafkatest/services/zookeeper.py 
b/tests/kafkatest/services/zookeeper.py
index 5bda867..f6a6b02 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -30,6 +30,7 @@ from kafkatest.version import DEV_BRANCH
 class ZookeeperService(KafkaPathResolverMixin, Service):
     ROOT = "/mnt/zookeeper"
     DATA = os.path.join(ROOT, "data")
+    HEAP_DUMP_FILE = os.path.join(ROOT, "zk_heap_dump.bin")
 
     logs = {
         "zk_log": {
@@ -37,7 +38,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
             "collect_default": True},
         "zk_data": {
             "path": DATA,
-            "collect_default": False}
+            "collect_default": False},
+        "zk_heap_dump_file": {
+            "path": HEAP_DUMP_FILE,
+            "collect_default": True}
     }
 
     def __init__(self, context, num_nodes, zk_sasl = False):
@@ -76,8 +80,10 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
         self.logger.info(config_file)
         node.account.create_file("%s/zookeeper.properties" % 
ZookeeperService.ROOT, config_file)
 
-        start_cmd = "export KAFKA_OPTS=\"%s\";" % (self.kafka_opts + ' ' + 
self.security_system_properties) \
+        heap_kafka_opts = "-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=%s" % self.logs["zk_heap_dump_file"]["path"]
+        other_kafka_opts = self.kafka_opts + ' ' + 
self.security_system_properties \
             if self.security_config.zk_sasl else self.kafka_opts
+        start_cmd = "export KAFKA_OPTS=\"%s %s\";" % (heap_kafka_opts, 
other_kafka_opts)
         start_cmd += "%s " % self.path.script("zookeeper-server-start.sh", 
node)
         start_cmd += "%s/zookeeper.properties &>> %s &" % 
(ZookeeperService.ROOT, self.logs["zk_log"]["path"])
         node.account.ssh(start_cmd)

Reply via email to