Repository: kafka
Updated Branches:
  refs/heads/trunk 6acd37720 -> 1d2ae89c5


KAFKA-2439; Add MirrorMaker service class for system tests

Added MirrorMaker service and a few corresponding sanity checks, as well as 
necessary config template files. A few additional updates to accomodate the 
change in wait_until from ducktape0.2.0->0.3.0

Author: Geoff Anderson <ge...@confluent.io>

Reviewers: Ewen Cheslack-Postava, Gwen Shapira

Closes #148 from granders/KAFKA-2439 and squashes the following commits:

c7c3ebd [Geoff Anderson] MirrorMaker now can run as multi-node service. Added 
kill -9 to various clean_node methods.
1e806f2 [Geoff Anderson] Various cleanups per review.
1b4b049 [Geoff Anderson] Added MirrorMaker service and a few corresponding 
sanity checks, as well as necessary config template files. A few additional 
updates to accomodate the change in wait_until from ducktape0.2.0->0.3.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d2ae89c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d2ae89c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d2ae89c

Branch: refs/heads/trunk
Commit: 1d2ae89c5a1dc5d18b8188bf737a8e1d195be325
Parents: 6acd377
Author: Geoff Anderson <ge...@confluent.io>
Authored: Sat Aug 22 19:23:36 2015 -0700
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Sat Aug 22 19:23:36 2015 -0700

----------------------------------------------------------------------
 .../sanity_checks/test_console_consumer.py      |  12 +-
 .../sanity_checks/test_mirror_maker.py          |  90 ++++++++++
 tests/kafkatest/services/console_consumer.py    |  17 +-
 tests/kafkatest/services/kafka.py               |   1 +
 tests/kafkatest/services/mirror_maker.py        | 165 +++++++++++++++++++
 .../templates/console_consumer.properties       |   4 +-
 .../templates/console_consumer_log4j.properties |  26 ---
 .../services/templates/consumer.properties      |  23 +++
 .../services/templates/kafka.properties         |  80 ---------
 .../services/templates/producer.properties      |  28 ++++
 .../services/templates/tools_log4j.properties   |  26 +++
 tests/kafkatest/services/verifiable_producer.py |   4 +
 tests/kafkatest/services/zookeeper.py           |  16 ++
 tests/kafkatest/tests/replication_test.py       |   8 +-
 tests/setup.py                                  |   2 +-
 15 files changed, 379 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py 
b/tests/kafkatest/sanity_checks/test_console_consumer.py
index cd8c8f9..3e523e1 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -61,20 +61,20 @@ class ConsoleConsumerTest(Test):
         self.consumer.start()
         node = self.consumer.nodes[0]
 
-        if not wait_until(lambda: self.consumer.alive(node), timeout_sec=10, 
backoff_sec=.2):
-            raise Exception("Consumer was too slow to start")
+        wait_until(lambda: self.consumer.alive(node), 
+            timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to 
start")
         self.logger.info("consumer started in %s seconds " % str(time.time() - 
t0))
 
         # Verify that log output is happening
-        if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), 
timeout_sec=10):
-            raise Exception("Timed out waiting for log file to exist")
+        wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), 
timeout_sec=10,
+                   err_msg="Timed out waiting for logging to start.")
         assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
 
         # Verify no consumed messages
         assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
 
         self.consumer.stop_node(node)
-        if not wait_until(lambda: not self.consumer.alive(node), 
timeout_sec=10, backoff_sec=.2):
-            raise Exception("Took too long for consumer to die.")
+
+
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/sanity_checks/test_mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_mirror_maker.py 
b/tests/kafkatest/sanity_checks/test_mirror_maker.py
new file mode 100644
index 0000000..3481d7a
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_mirror_maker.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.tests.test import Test
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.mirror_maker import MirrorMaker
+
+
+class TestMirrorMakerService(Test):
+    """Sanity checks on mirror maker service class."""
+    def __init__(self, test_context):
+        super(TestMirrorMakerService, self).__init__(test_context)
+
+        self.topic = "topic"
+        self.source_zk = ZookeeperService(test_context, num_nodes=1)
+        self.target_zk = ZookeeperService(test_context, num_nodes=1)
+
+        self.source_kafka = KafkaService(test_context, num_nodes=1, 
zk=self.source_zk,
+                                  topics={self.topic: {"partitions": 1, 
"replication-factor": 1}})
+        self.target_kafka = KafkaService(test_context, num_nodes=1, 
zk=self.target_zk,
+                                  topics={self.topic: {"partitions": 1, 
"replication-factor": 1}})
+
+        self.num_messages = 1000
+        # This will produce to source kafka cluster
+        self.producer = VerifiableProducer(test_context, num_nodes=1, 
kafka=self.source_kafka, topic=self.topic,
+                                           max_messages=self.num_messages, 
throughput=1000)
+
+        # Use a regex whitelist to check that the start command is well-formed 
in this case
+        self.mirror_maker = MirrorMaker(test_context, num_nodes=1, 
source=self.source_kafka, target=self.target_kafka,
+                                        whitelist=".*", 
consumer_timeout_ms=2000)
+
+        # This will consume from target kafka cluster
+        self.consumer = ConsoleConsumer(test_context, num_nodes=1, 
kafka=self.target_kafka, topic=self.topic,
+                                        consumer_timeout_ms=1000)
+
+    def setUp(self):
+        # Source cluster
+        self.source_zk.start()
+        self.source_kafka.start()
+
+        # Target cluster
+        self.target_zk.start()
+        self.target_kafka.start()
+
+    def test_end_to_end(self):
+        """
+        Test end-to-end behavior under non-failure conditions.
+
+        Setup: two single node Kafka clusters, each connected to its own 
single node zookeeper cluster.
+        One is source, and the other is target. Single-node mirror maker 
mirrors from source to target.
+
+        - Start mirror maker.
+        - Produce a small number of messages to the source cluster.
+        - Consume messages from target.
+        - Verify that number of consumed messages matches the number produced.
+        """
+        self.mirror_maker.start()
+        # Check that consumer_timeout_ms setting made it to config file
+        self.mirror_maker.nodes[0].account.ssh(
+            "grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, 
allow_fail=False)
+
+        self.producer.start()
+        self.producer.wait()
+        self.consumer.start()
+        self.consumer.wait()
+
+        num_consumed = len(self.consumer.messages_consumed[1])
+        num_produced = self.producer.num_acked
+        assert num_produced == self.num_messages, "num_produced: %d, 
num_messages: %d" % (num_produced, self.num_messages)
+        assert num_produced == num_consumed, "num_produced: %d, num_consumed: 
%d" % (num_produced, num_consumed)
+
+        self.mirror_maker.stop()
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py 
b/tests/kafkatest/services/console_consumer.py
index 18c9f63..ffde6a2 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -14,8 +14,10 @@
 # limitations under the License.
 
 from ducktape.services.background_thread import BackgroundThreadService
+from ducktape.utils.util import wait_until
 
 import os
+import subprocess
 
 
 def is_int(msg):
@@ -91,7 +93,7 @@ class ConsoleConsumer(BackgroundThreadService):
             "collect_default": True}
         }
 
-    def __init__(self, context, num_nodes, kafka, topic, 
message_validator=is_int, from_beginning=True, consumer_timeout_ms=None):
+    def __init__(self, context, num_nodes, kafka, topic, 
message_validator=None, from_beginning=True, consumer_timeout_ms=None):
         """
         Args:
             context:                    standard context
@@ -141,7 +143,7 @@ class ConsoleConsumer(BackgroundThreadService):
             cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep 
| awk '{print $1}'"
             pid_arr = [pid for pid in node.account.ssh_capture(cmd, 
allow_fail=True, callback=int)]
             return pid_arr
-        except:
+        except (subprocess.CalledProcessError, ValueError) as e:
             return []
 
     def alive(self, node):
@@ -161,7 +163,7 @@ class ConsoleConsumer(BackgroundThreadService):
         node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
 
         # Create and upload log properties
-        log_config = self.render('console_consumer_log4j.properties', 
log_file=ConsoleConsumer.LOG_FILE)
+        log_config = self.render('tools_log4j.properties', 
log_file=ConsoleConsumer.LOG_FILE)
         node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
 
         # Run and capture output
@@ -169,7 +171,8 @@ class ConsoleConsumer(BackgroundThreadService):
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
         for line in node.account.ssh_capture(cmd, allow_fail=False):
             msg = line.strip()
-            msg = self.message_validator(msg)
+            if self.message_validator is not None:
+                msg = self.message_validator(msg)
             if msg is not None:
                 self.logger.debug("consumed a message: " + str(msg))
                 self.messages_consumed[idx].append(msg)
@@ -179,7 +182,13 @@ class ConsoleConsumer(BackgroundThreadService):
 
     def stop_node(self, node):
         node.account.kill_process("java", allow_fail=True)
+        wait_until(lambda: not self.alive(node), timeout_sec=10, 
backoff_sec=.2,
+                   err_msg="Timed out waiting for consumer to stop.")
 
     def clean_node(self, node):
+        if self.alive(node):
+            self.logger.warn("%s %s was still alive at cleanup time. Killing 
forcefully..." %
+                             (self.__class__.__name__, node.account))
+        node.account.kill_process("java", clean_shutdown=False, 
allow_fail=True)
         node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, 
allow_fail=False)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka.py 
b/tests/kafkatest/services/kafka.py
index 34ec5ef..76f9cf6 100644
--- a/tests/kafkatest/services/kafka.py
+++ b/tests/kafkatest/services/kafka.py
@@ -93,6 +93,7 @@ class KafkaService(Service):
         node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
 
     def clean_node(self, node):
+        node.account.kill_process("kafka", clean_shutdown=False, 
allow_fail=True)
         node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties 
/mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
 
     def create_topic(self, topic_cfg):

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/mirror_maker.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/mirror_maker.py 
b/tests/kafkatest/services/mirror_maker.py
new file mode 100644
index 0000000..afbed13
--- /dev/null
+++ b/tests/kafkatest/services/mirror_maker.py
@@ -0,0 +1,165 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.service import Service
+from ducktape.utils.util import wait_until
+
+import os
+import subprocess
+
+"""
+0.8.2.1 MirrorMaker options
+
+Option                                  Description
+------                                  -----------
+--abort.on.send.failure <Stop the       Configure the mirror maker to exit on
+  entire mirror maker when a send         a failed send. (default: true)
+  failure occurs>
+--blacklist <Java regex (String)>       Blacklist of topics to mirror.
+--consumer.config <config file>         Embedded consumer config for consuming
+                                          from the source cluster.
+--consumer.rebalance.listener <A        The consumer rebalance listener to use
+  custom rebalance listener of type       for mirror maker consumer.
+  ConsumerRebalanceListener>
+--help                                  Print this message.
+--message.handler <A custom message     Message handler which will process
+  handler of type                         every record in-between consumer and
+  MirrorMakerMessageHandler>              producer.
+--message.handler.args <Arguments       Arguments used by custom rebalance
+  passed to message handler               listener for mirror maker consumer
+  constructor.>
+--num.streams <Integer: Number of       Number of consumption streams.
+  threads>                                (default: 1)
+--offset.commit.interval.ms <Integer:   Offset commit interval in ms (default:
+  offset commit interval in               60000)
+  millisecond>
+--producer.config <config file>         Embedded producer config.
+--rebalance.listener.args <Arguments    Arguments used by custom rebalance
+  passed to custom rebalance listener     listener for mirror maker consumer
+  constructor as a string.>
+--whitelist <Java regex (String)>       Whitelist of topics to mirror.
+"""
+
+
+class MirrorMaker(Service):
+
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/mirror_maker"
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
+    CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties")
+    KAFKA_HOME = "/opt/kafka/"
+
+    logs = {
+        "mirror_maker_log": {
+            "path": LOG_FILE,
+            "collect_default": True}
+        }
+
+    def __init__(self, context, num_nodes, source, target, whitelist=None, 
blacklist=None, num_streams=1, consumer_timeout_ms=None):
+        """
+        MirrorMaker mirrors messages from one or more source clusters to a 
single destination cluster.
+
+        Args:
+            context:                    standard context
+            source:                     source Kafka cluster
+            target:                     target Kafka cluster to which data 
will be mirrored
+            whitelist:                  whitelist regex for topics to mirror
+            blacklist:                  blacklist regex for topics not to 
mirror
+            num_streams:                number of consumer threads to create; 
can be a single int, or a list with
+                                            one value per node, allowing 
num_streams to be the same for each node,
+                                            or configured independently 
per-node
+            consumer_timeout_ms:        consumer stops if t > 
consumer_timeout_ms elapses between consecutive messages
+        """
+        super(MirrorMaker, self).__init__(context, num_nodes=num_nodes)
+
+        self.consumer_timeout_ms = consumer_timeout_ms
+        self.num_streams = num_streams
+        if not isinstance(num_streams, int):
+            # if not an integer, num_streams should be configured per-node
+            assert len(num_streams) == num_nodes
+        self.whitelist = whitelist
+        self.blacklist = blacklist
+        self.source = source
+        self.target = target
+
+    def start_cmd(self, node):
+        cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
+        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % 
MirrorMaker.LOG4J_CONFIG
+        cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % 
MirrorMaker.KAFKA_HOME
+        cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
+        cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
+        if isinstance(self.num_streams, int):
+            cmd += " --num.streams %d" % self.num_streams
+        else:
+            # config num_streams separately on each node
+            cmd += " --num.streams %d" % self.num_streams[self.idx(node) - 1]
+        if self.whitelist is not None:
+            cmd += " --whitelist=\"%s\"" % self.whitelist
+        if self.blacklist is not None:
+            cmd += " --blacklist=\"%s\"" % self.blacklist
+        cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, 
MirrorMaker.LOG_FILE)
+        return cmd
+
+    def pids(self, node):
+        try:
+            cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | 
awk '{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, 
allow_fail=True, callback=int)]
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
+    def start_node(self, node):
+        node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, 
allow_fail=False)
+        node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False)
+
+        # Create, upload one consumer config file for source cluster
+        consumer_props = self.render('consumer.properties', 
zookeeper_connect=self.source.zk.connect_setting())
+        node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props)
+
+        # Create, upload producer properties file for target cluster
+        producer_props = self.render('producer.properties',  
broker_list=self.target.bootstrap_servers(),
+                                     producer_type="async")
+        node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props)
+
+        # Create and upload log properties
+        log_config = self.render('tools_log4j.properties', 
log_file=MirrorMaker.LOG_FILE)
+        node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config)
+
+        # Run mirror maker
+        cmd = self.start_cmd(node)
+        self.logger.debug("Mirror maker command: %s", cmd)
+        node.account.ssh(cmd, allow_fail=False)
+        wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5,
+                   err_msg="Mirror maker took to long to start.")
+        self.logger.debug("Mirror maker is alive")
+
+    def stop_node(self, node):
+        node.account.kill_process("java", allow_fail=True)
+        wait_until(lambda: not self.alive(node), timeout_sec=10, 
backoff_sec=.5,
+                   err_msg="Mirror maker took to long to stop.")
+
+    def clean_node(self, node):
+        if self.alive(node):
+            self.logger.warn("%s %s was still alive at cleanup time. Killing 
forcefully..." %
+                             (self.__class__.__name__, node.account))
+        node.account.kill_process("java", clean_shutdown=False, 
allow_fail=True)
+        node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, 
allow_fail=False)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/console_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer.properties 
b/tests/kafkatest/services/templates/console_consumer.properties
index 944c2c9..7143179 100644
--- a/tests/kafkatest/services/templates/console_consumer.properties
+++ b/tests/kafkatest/services/templates/console_consumer.properties
@@ -14,6 +14,6 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
-{% if consumer_timeout_ms is not none %}
+{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
 consumer.timeout.ms={{ consumer_timeout_ms }}
-{% endif %}
\ No newline at end of file
+{% endif %}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/console_consumer_log4j.properties
----------------------------------------------------------------------
diff --git 
a/tests/kafkatest/services/templates/console_consumer_log4j.properties 
b/tests/kafkatest/services/templates/console_consumer_log4j.properties
deleted file mode 100644
index e63e6d6..0000000
--- a/tests/kafkatest/services/templates/console_consumer_log4j.properties
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Define the root logger with appender file
-log4j.rootLogger = INFO, FILE
-
-log4j.appender.FILE=org.apache.log4j.FileAppender
-log4j.appender.FILE.File={{ log_file }}
-log4j.appender.FILE.ImmediateFlush=true
-log4j.appender.FILE.Threshold=debug
-# Set the append to false, overwrite
-log4j.appender.FILE.Append=false
-log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
-log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/consumer.properties 
b/tests/kafkatest/services/templates/consumer.properties
new file mode 100644
index 0000000..b8723d1
--- /dev/null
+++ b/tests/kafkatest/services/templates/consumer.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+zookeeper.connect={{ zookeeper_connect }}
+zookeeper.connection.timeout.ms={{ 
zookeeper_connection_timeout_ms|default(6000) }}
+group.id={{ group_id|default('test-consumer-group') }}
+
+{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %}
+consumer.timeout.ms={{ consumer_timeout_ms }}
+{% endif %}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/kafka.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/kafka.properties 
b/tests/kafkatest/services/templates/kafka.properties
index db1077a..6650d23 100644
--- a/tests/kafkatest/services/templates/kafka.properties
+++ b/tests/kafkatest/services/templates/kafka.properties
@@ -14,108 +14,28 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
-############################# Server Basics #############################
 
-# The id of the broker. This must be set to a unique integer for each broker.
 broker.id={{ broker_id }}
-
-############################# Socket Server Settings 
#############################
-
-# The port the socket server listens on
 port=9092
-
-# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
 #host.name=localhost
-
-# Hostname the broker will advertise to producers and consumers. If not set, 
it uses the
-# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
-# java.net.InetAddress.getCanonicalHostName().
 advertised.host.name={{ node.account.hostname }}
-
-# The port to publish to ZooKeeper for clients to use. If this is not set,
-# it will publish the same port that the broker binds to.
 #advertised.port=<port accessible by clients>
-
-# The number of threads handling network requests
 num.network.threads=3
- 
-# The number of threads doing disk I/O
 num.io.threads=8
-
-# The send buffer (SO_SNDBUF) used by the socket server
 socket.send.buffer.bytes=102400
-
-# The receive buffer (SO_RCVBUF) used by the socket server
 socket.receive.buffer.bytes=65536
-
-# The maximum size of a request that the socket server will accept (protection 
against OOM)
 socket.request.max.bytes=104857600
 
-
-############################# Log Basics #############################
-
-# A comma seperated list of directories under which to store log files
 log.dirs=/mnt/kafka-logs
-
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
 num.partitions=1
-
-# The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs 
located in RAID array.
 num.recovery.threads.per.data.dir=1
-
-############################# Log Flush Policy #############################
-
-# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data 
to disk. 
-# There are a few important trade-offs here:
-#    1. Durability: Unflushed data may be lost if you are not using 
replication.
-#    2. Latency: Very large flush intervals may lead to latency spikes when 
the flush does occur as there will be a lot of data to flush.
-#    3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks. 
-# The settings below allow one to configure the flush policy to flush data 
after a period of time or
-# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
 #log.flush.interval.messages=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
 #log.flush.interval.ms=1000
-
-############################# Log Retention Policy 
#############################
-
-# The following configurations control the disposal of log segments. The 
policy can
-# be set to delete segments after a period of time, or after a given size has 
accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion
 log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log as 
long as the remaining
-# segments don't drop below log.retention.bytes.
 #log.retention.bytes=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
 log.segment.bytes=1073741824
-
-# The interval at which log segments are checked to see if they can be deleted 
according 
-# to the retention policies
 log.retention.check.interval.ms=300000
-
-# By default the log cleaner is disabled and the log retention policy will 
default to just delete segments after their retention expires.
-# If log.cleaner.enable=true is set the cleaner will be enabled and individual 
logs can then be marked for log compaction.
 log.cleaner.enable=false
 
-############################# Zookeeper #############################
-
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
 zookeeper.connect={{ zk.connect_setting() }}
-
-# Timeout in ms for connecting to zookeeper
 zookeeper.connection.timeout.ms=2000

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/producer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/producer.properties 
b/tests/kafkatest/services/templates/producer.properties
new file mode 100644
index 0000000..ede60c8
--- /dev/null
+++ b/tests/kafkatest/services/templates/producer.properties
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.producer.ProducerConfig for more details
+
+metadata.broker.list={{ broker_list }}
+bootstrap.servers = {{ broker_list }}
+producer.type={{ producer_type }}  # sync or async
+compression.codec=none
+serializer.class=kafka.serializer.DefaultEncoder
+
+#partitioner.class=
+#compressed.topics=
+#queue.buffering.max.ms=
+#queue.buffering.max.messages=
+#queue.enqueue.timeout.ms=
+#batch.num.messages=

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/tools_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/tools_log4j.properties 
b/tests/kafkatest/services/templates/tools_log4j.properties
new file mode 100644
index 0000000..e63e6d6
--- /dev/null
+++ b/tests/kafkatest/services/templates/tools_log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define the root logger with appender file
+log4j.rootLogger = INFO, FILE
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.File={{ log_file }}
+log4j.appender.FILE.ImmediateFlush=true
+log4j.appender.FILE.Threshold=debug
+# Set the append to false, overwrite
+log4j.appender.FILE.Append=false
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py 
b/tests/kafkatest/services/verifiable_producer.py
index cca8227..158db7a 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -89,12 +89,16 @@ class VerifiableProducer(BackgroundThreadService):
 
     def stop_node(self, node):
         node.account.kill_process("VerifiableProducer", allow_fail=False)
+        if self.worker_threads is None:
+            return
+
         # block until the corresponding thread exits
         if len(self.worker_threads) >= self.idx(node):
             # Need to guard this because stop is preemptively called before 
the worker threads are added and started
             self.worker_threads[self.idx(node) - 1].join()
 
     def clean_node(self, node):
+        node.account.kill_process("VerifiableProducer", clean_shutdown=False, 
allow_fail=False)
         node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
 
     def try_parse_json(self, string):

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/zookeeper.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/zookeeper.py 
b/tests/kafkatest/services/zookeeper.py
index 56f4606..09bec35 100644
--- a/tests/kafkatest/services/zookeeper.py
+++ b/tests/kafkatest/services/zookeeper.py
@@ -16,6 +16,7 @@
 
 from ducktape.services.service import Service
 
+import subprocess
 import time
 
 
@@ -51,6 +52,17 @@ class ZookeeperService(Service):
 
         time.sleep(5)  # give it some time to start
 
+    def pids(self, node):
+        try:
+            cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk 
'{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, 
allow_fail=True, callback=int)]
+            return pid_arr
+        except (subprocess.CalledProcessError, ValueError) as e:
+            return []
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
     def stop_node(self, node):
         idx = self.idx(node)
         self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, 
idx, node.account.hostname))
@@ -58,6 +70,10 @@ class ZookeeperService(Service):
 
     def clean_node(self, node):
         self.logger.info("Cleaning ZK node %d on %s", self.idx(node), 
node.account.hostname)
+        if self.alive(node):
+            self.logger.warn("%s %s was still alive at cleanup time. Killing 
forcefully..." %
+                             (self.__class__.__name__, node.account))
+        node.account.kill_process("zookeeper", clean_shutdown=False, 
allow_fail=True)
         node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties 
/mnt/zk.log", allow_fail=False)
 
     def connect_setting(self):

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/tests/replication_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/replication_test.py 
b/tests/kafkatest/tests/replication_test.py
index fed1ea1..755fb42 100644
--- a/tests/kafkatest/tests/replication_test.py
+++ b/tests/kafkatest/tests/replication_test.py
@@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.console_consumer import ConsoleConsumer, is_int
 
 import signal
 import time
@@ -76,12 +76,12 @@ class ReplicationTest(Test):
 
         """
         self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
-        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka, self.topic, consumer_timeout_ms=3000)
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int)
 
         # Produce in a background thread while driving broker failures
         self.producer.start()
-        if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5):
-            raise RuntimeError("Producer failed to start in a reasonable 
amount of time.")
+        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
+             err_msg="Producer failed to start in a reasonable amount of 
time.")
         failure()
         self.producer.stop()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/setup.py
----------------------------------------------------------------------
diff --git a/tests/setup.py b/tests/setup.py
index 5ce4bb7..a2fa71a 100644
--- a/tests/setup.py
+++ b/tests/setup.py
@@ -23,5 +23,5 @@ setup(name="kafkatest",
       platforms=["any"], 
       license="apache2.0",
       packages=find_packages(),
-      requires=["ducktape(>=0.2.0)"]
+      requires=["ducktape(==0.3.0)"]
       )

Reply via email to