Repository: samza
Updated Branches:
  refs/heads/master c7ac26377 -> 41c74b968


SAMZA-548; add performance test for container with kafka consumer and producer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/41c74b96
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/41c74b96
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/41c74b96

Branch: refs/heads/master
Commit: 41c74b96876473acb6403e544bec7a00a04d2fa3
Parents: c7ac263
Author: Chris Riccomini <[email protected]>
Authored: Fri Feb 13 17:10:03 2015 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Fri Feb 13 17:10:03 2015 -0800

----------------------------------------------------------------------
 samza-shell/src/main/bash/stat-yarn-job.sh      | 21 +++++
 .../src/main/config/negate-number.properties    | 18 +----
 .../kafka-read-write-performance.properties     | 35 ++++++++
 .../test/integration/NegateNumberTask.java      | 44 +++++++++-
 .../src/main/python/configs/downloads.json      |  2 +-
 samza-test/src/main/python/configs/kafka.json   | 22 ++---
 .../python/configs/smoke-tests/smoke-tests.json |  6 --
 samza-test/src/main/python/configs/tests.json   |  5 ++
 samza-test/src/main/python/deployment.py        | 21 ++---
 .../src/main/python/samza_job_yarn_deployer.py  | 47 ++++++++++-
 samza-test/src/main/python/tests.py             |  3 +-
 .../src/main/python/tests/performance_tests.py  | 80 +++++++++++++++++++
 samza-test/src/main/python/tests/smoke_tests.py | 83 +++++++------------
 samza-test/src/main/python/tests/util.py        | 84 ++++++++++++++++++++
 14 files changed, 359 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-shell/src/main/bash/stat-yarn-job.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/stat-yarn-job.sh 
b/samza-shell/src/main/bash/stat-yarn-job.sh
new file mode 100755
index 0000000..e5f6847
--- /dev/null
+++ b/samza-shell/src/main/bash/stat-yarn-job.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS 
-Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh 
org.apache.hadoop.yarn.client.cli.ApplicationCLI application -status "$@"

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/config/negate-number.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/negate-number.properties 
b/samza-test/src/main/config/negate-number.properties
index 379fa61..b9f898c 100644
--- a/samza-test/src/main/config/negate-number.properties
+++ b/samza-test/src/main/config/negate-number.properties
@@ -21,18 +21,12 @@ job.name=samza-negate-number
 
 # YARN
 yarn.container.count=1
-yarn.container.memory.mb=1024
 
 # Task
 task.class=org.apache.samza.test.integration.NegateNumberTask
 task.inputs=kafka.samza-test-topic
-task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.replication.factor=1
-task.checkpoint.system=kafka
-task.lifecycle.listener.generator.class=com.linkedin.samza.task.GeneratorLifecycleListenerFactory
-task.lifecycle.listener.generator.fabric=CORP-EAT1
-task.opts=-Xmx6g
-task.command.class=org.apache.samza.job.ShellCommandBuilder
+task.max.messages=50
+task.outputs=kafka.samza-test-topic-output
 
 # Serializers
 
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
@@ -43,12 +37,4 @@ systems.kafka.samza.msg.serde=string
 systems.kafka.samza.key.serde=string
 systems.kafka.samza.offset.default=oldest
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
-systems.kafka.producer.compression.type=gzip
 systems.kafka.producer.bootstrap.servers=localhost:9092
-systems.kafka.producer.acks=1
-systems.kafka.producer.metadata.max.age.ms=86400000
-# Normally, we'd set this much higher, but we want things to look snappy in 
the demo.
-systems.kafka.producer.buffer.memory=1000000
-
-# negate-number
-streams.samza-test-topic.consumer.reset.offset=true

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/config/perf/kafka-read-write-performance.properties
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/config/perf/kafka-read-write-performance.properties 
b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
new file mode 100644
index 0000000..122b14a
--- /dev/null
+++ b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kafka-read-write-performance
+
+# YARN
+yarn.container.count=1
+
+# Task
+task.class=org.apache.samza.test.performance.TestPerformanceTask
+task.inputs=kafka.kafka-read-write-performance-input
+task.outputs=kafka.kafka-read-write-performance-output
+task.max.messages=1000000
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.offset.default=oldest
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
index 782e9f4..617cea6 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java
@@ -19,22 +19,58 @@
 
 package org.apache.samza.test.integration;
 
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.TaskCoordinator.RequestScope;
+import org.apache.samza.util.Util;
 
-/*
- * A simple test job that reads strings, converts them to integers, multiplies 
+/**
+ * A simple test job that reads strings, converts them to integers, multiplies
  * by -1, and outputs to "samza-test-topic-output" stream.
  */
-public class NegateNumberTask implements StreamTask {
+public class NegateNumberTask implements StreamTask, InitableTask {
+  /**
+   * How many messages the all tasks in a single container have processed.
+   */
+  private static int messagesProcessed = 0;
+
+  /**
+   * How many messages to process before shutting down.
+   */
+  private int maxMessages;
+
+  /**
+   * The SystemStream to send negated numbers to.
+   */
+  private SystemStream outputSystemStream;
+
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    maxMessages = config.getInt("task.max.messages", 50);
+    String outputSystemStreamString = config.get("task.outputs", null);
+    if (outputSystemStreamString == null) {
+      throw new ConfigException("Missing required configuration: 
task.outputs");
+    }
+    outputSystemStream = 
Util.getSystemStreamFromNames(outputSystemStreamString);
+  }
+
+  @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
+    messagesProcessed += 1;
     String input = (String) envelope.getMessage();
     Integer number = Integer.valueOf(input);
     Integer output = number.intValue() * -1;
-    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", 
"samza-test-topic-output"), output.toString()));
+    collector.send(new OutgoingMessageEnvelope(outputSystemStream, 
output.toString()));
+    if (messagesProcessed >= maxMessages) {
+      coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/downloads.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/downloads.json 
b/samza-test/src/main/python/configs/downloads.json
index 8ded306..a75756f 100644
--- a/samza-test/src/main/python/configs/downloads.json
+++ b/samza-test/src/main/python/configs/downloads.json
@@ -1,5 +1,5 @@
 {
-  "url_kafka": 
"http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz";,
+  "url_kafka": 
"http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.9.2-0.8.2.0.tgz";,
   "url_zookeeper": 
"http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz";,
   "url_hadoop": 
"https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz";
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/kafka.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/kafka.json 
b/samza-test/src/main/python/configs/kafka.json
index 9a7af19..ab2f346 100644
--- a/samza-test/src/main/python/configs/kafka.json
+++ b/samza-test/src/main/python/configs/kafka.json
@@ -3,21 +3,21 @@
     "kafka_instance_0": "localhost"
   },
   "kafka_port": 9092,
-  "kafka_start_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh -daemon 
kafka_2.9.2-0.8.1.1/config/server.properties",
-  "kafka_stop_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh",
+  "kafka_start_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-start.sh -daemon 
kafka_2.9.2-0.8.2.0/config/server.properties",
+  "kafka_stop_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
   "kafka_install_path": "deploy/kafka",
-  "kafka_executable": "kafka_2.9.2-0.8.1.1.tgz",
+  "kafka_executable": "kafka_2.9.2-0.8.2.0.tgz",
   "kafka_post_install_cmds": [
-    "sed -i.bak 's/SIGINT/SIGTERM/g' 
kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh",
-    "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' 
kafka_2.9.2-0.8.1.1/config/server.properties",
-    "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' 
kafka_2.9.2-0.8.1.1/config/server.properties"
+    "sed -i.bak 's/SIGINT/SIGTERM/g' 
kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh",
+    "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' 
kafka_2.9.2-0.8.2.0/config/server.properties",
+    "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' 
kafka_2.9.2-0.8.2.0/config/server.properties"
   ],
   "kafka_logs": [
     "log-cleaner.log",
-    "kafka_2.9.2-0.8.1.1/logs/controller.log",
-    "kafka_2.9.2-0.8.1.1/logs/kafka-request.log",
-    "kafka_2.9.2-0.8.1.1/logs/kafkaServer-gc.log",
-    "kafka_2.9.2-0.8.1.1/logs/server.log",
-    "kafka_2.9.2-0.8.1.1/logs/state-change.log"
+    "kafka_2.9.2-0.8.2.0/logs/controller.log",
+    "kafka_2.9.2-0.8.2.0/logs/kafka-request.log",
+    "kafka_2.9.2-0.8.2.0/logs/kafkaServer-gc.log",
+    "kafka_2.9.2-0.8.2.0/logs/server.log",
+    "kafka_2.9.2-0.8.2.0/logs/state-change.log"
   ]
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json 
b/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
deleted file mode 100644
index 65f8568..0000000
--- a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json
+++ /dev/null
@@ -1,6 +0,0 @@
-{
-  "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz",
-  "samza_install_path": "deploy/smoke_tests",
-  "samza_config_factory": 
"org.apache.samza.config.factories.PropertiesConfigFactory",
-  "samza_config_file": "config/negate-number.properties"
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/tests.json
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/configs/tests.json 
b/samza-test/src/main/python/configs/tests.json
new file mode 100644
index 0000000..5251af9
--- /dev/null
+++ b/samza-test/src/main/python/configs/tests.json
@@ -0,0 +1,5 @@
+{
+  "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz",
+  "samza_install_path": "deploy/smoke_tests",
+  "samza_config_factory": 
"org.apache.samza.config.factories.PropertiesConfigFactory"
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/deployment.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/deployment.py 
b/samza-test/src/main/python/deployment.py
index a0e1481..89ba728 100644
--- a/samza-test/src/main/python/deployment.py
+++ b/samza-test/src/main/python/deployment.py
@@ -76,36 +76,25 @@ def setup_suite():
         'hostname': host
       })
 
-  # Start the Samza jobs.
+  # Setup Samza job deployer.
   samza_job_deployer = SamzaJobYarnDeployer({
+    'config_factory': c('samza_config_factory'),
     'yarn_site_template': c('yarn_site_template'),
     'yarn_driver_configs': c('yarn_driver_configs'),
     'yarn_nm_hosts': c('yarn_nm_hosts').values(),
     'install_path': samza_install_path,
   })
 
-  samza_job_deployer.install('smoke_tests', {
+  samza_job_deployer.install('tests', {
     'executable': c('samza_executable'),
   })
 
-  samza_job_deployer.start('negate_number', {
-    'package_id': 'smoke_tests',
-    'config_factory': c('samza_config_factory'),
-    'config_file': c('samza_config_file'),
-    'install_path': samza_install_path,
-  })
+  runtime.set_deployer('samza_job_deployer', samza_job_deployer)
 
 def teardown_suite():
-  # Stop the samza jobs.
-  samza_job_deployer.stop('negate_number', {
-    'package_id': 'smoke_tests',
-    'install_path': samza_install_path,
-  })
-
-  samza_job_deployer.uninstall('smoke_tests')
+  samza_job_deployer.uninstall('tests')
 
   # Undeploy everything.
   for name, deployer in deployers.iteritems():
     for instance, host in c(name + '_hosts').iteritems():
       deployer.undeploy(instance)
-

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/samza_job_yarn_deployer.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/samza_job_yarn_deployer.py 
b/samza-test/src/main/python/samza_job_yarn_deployer.py
index e18bc58..38635ca 100644
--- a/samza-test/src/main/python/samza_job_yarn_deployer.py
+++ b/samza-test/src/main/python/samza_job_yarn_deployer.py
@@ -39,7 +39,7 @@ class SamzaJobYarnDeployer(Deployer):
     to start and stop Samza jobs in a YARN grid.
 
     param: configs -- Map of config key/values pairs. These configs will be 
used
-    as a default whenever overrides are not provided in the methods (intall, 
+    as a default whenever overrides are not provided in the methods (install, 
     start, stop, etc) below.
     """
     logging.getLogger("paramiko").setLevel(logging.ERROR)
@@ -173,6 +173,47 @@ class SamzaJobYarnDeployer(Deployer):
       p.wait()
       assert p.returncode == 0, "Command returned non-zero exit code ({0}): 
{1}".format(p.returncode, command)
 
+  def await(self, job_id, configs={}):
+    """
+    Waits for a Samza job to finish using bin/stat-yarn-job.sh. A job is 
+    finished when its "Final State" is not "UNDEFINED".
+
+    param: job_id -- A unique ID used to idenitfy a Samza job.
+    param: configs -- Map of config key/values pairs. Valid keys include:
+
+    package_id: The package_id for the package that contains the code for 
job_id.
+    Usually, the package_id refers to the .tgz job tarball that contains the
+    code necessary to run job_id.
+    """
+    configs = self._get_merged_configs(configs)
+    self._validate_configs(configs, ['package_id'])
+
+    # Get configs.
+    package_id = configs.get('package_id')
+
+    # Get the application_id for the job.
+    application_id = self.app_ids.get(job_id)
+
+    # Stat the job, if it's been started, or WARN and return if it's hasn't.
+    final_state = 'UNDEFINED'
+    if not application_id:
+      logger.warn("Can't stat a job that was never started: 
{0}".format(job_id))
+    else:
+      command = "{0} {1}".format(os.path.join(package_id, 
"bin/stat-yarn-job.sh"), application_id)
+      env = self._get_env_vars(package_id)
+      while final_state == 'UNDEFINED':
+        p = Popen(command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE, 
env=env)
+        output, err = p.communicate()
+        logger.debug("Output from run-job.sh:\nstdout: {0}\nstderr: 
{1}".format(output, err))
+        assert p.returncode == 0, "Command ({0}) returned non-zero exit code 
({1}).\nstdout: {2}\nstderr: {3}".format(command, p.returncode, output, err)
+
+        # Check the final state for the job.
+        regex = r'.*Final.State . (\w*)'
+        match = re.match(regex, output.replace("\n", ' '))
+        final_state = match.group(1)
+        logger.debug("Got final state {0} for job_id {1}.".format(final_state, 
job_id))
+    return final_state
+
   def uninstall(self, package_id, configs={}):
     """
     Removes the install path for package_id from all remote hosts that it's 
been
@@ -201,6 +242,10 @@ class SamzaJobYarnDeployer(Deployer):
 
   # TODO we should implement the below helper methods over time, as we need 
them.
 
+  def get_processes(self):
+    # TODO raise NotImplementedError
+    return []
+
   def get_pid(self, container_id, configs={}):
     raise NotImplementedError
 

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests.py 
b/samza-test/src/main/python/tests.py
index dae414e..df64e23 100644
--- a/samza-test/src/main/python/tests.py
+++ b/samza-test/src/main/python/tests.py
@@ -24,6 +24,7 @@ test = {
   'perf_code': os.path.join(dir, 'perf.py'),
   'configs_directory': os.path.join(dir, 'configs'),
   'test_code': [
-    os.path.join(dir, 'tests', 'smoke_tests.py')
+    os.path.join(dir, 'tests', 'smoke_tests.py'),
+    os.path.join(dir, 'tests', 'performance_tests.py'),
   ],
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/performance_tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/performance_tests.py 
b/samza-test/src/main/python/tests/performance_tests.py
new file mode 100644
index 0000000..a97717f
--- /dev/null
+++ b/samza-test/src/main/python/tests/performance_tests.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import util
+import logging
+import zopkio.runtime as runtime
+from kafka import SimpleProducer, SimpleConsumer
+
+logger = logging.getLogger(__name__)
+
+JOB_ID = 'kafka-read-write-performance'
+PACKAGE_ID = 'tests'
+CONFIG_FILE = 'config/perf/kafka-read-write-performance.properties'
+TEST_INPUT_TOPIC = 'kafka-read-write-performance-input'
+TEST_OUTPUT_TOPIC = 'kafka-read-write-performance-output'
+NUM_MESSAGES = 1000000
+MESSAGE = 'a' * 200
+
+def test_kafka_read_write_performance():
+  """
+  Runs a Samza job that reads from Kafka, and writes back out to it. The 
+  writes/sec for the job is logged to the job's container.
+  """
+  _load_data()
+  util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE)
+  util.await_job(PACKAGE_ID, JOB_ID)
+
+def validate_kafka_read_write_performance():
+  """
+  Validates that all messages were sent to the output topic.
+  """
+  logger.info('Running validate_kafka_read_write_performance')
+  kafka = util.get_kafka_client()
+  kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC)
+  consumer = SimpleConsumer(
+    kafka, 
+    'samza-test-group', 
+    TEST_OUTPUT_TOPIC,
+    fetch_size_bytes=1000000,
+    buffer_size=32768,
+    max_buffer_size=None)
+  # wait 5 minutes to get all million messages
+  messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300)
+  message_count = len(messages)
+  assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found 
{1}'.format(NUM_MESSAGES, message_count)
+  kafka.close()
+
+def _load_data():
+  """
+  Sends 10 million messages to kafka-read-write-performance-input.
+  """
+  logger.info('Running test_kafka_read_write_performance')
+  kafka = util.get_kafka_client()
+  kafka.ensure_topic_exists(TEST_INPUT_TOPIC)
+  producer = SimpleProducer(
+    kafka,
+    req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
+    ack_timeout=30000,
+    batch_send=True,
+    batch_send_every_n=200)
+  logger.info('Loading {0} test messages.'.format(NUM_MESSAGES))
+  for i in range(0, NUM_MESSAGES):
+    if i % 100000 == 0:
+      logger.info('Loaded {0} messages.'.format(i))
+    producer.send_messages(TEST_INPUT_TOPIC, MESSAGE)
+  kafka.close()

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/smoke_tests.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/smoke_tests.py 
b/samza-test/src/main/python/tests/smoke_tests.py
index 7aec4e0..53d5fa9 100644
--- a/samza-test/src/main/python/tests/smoke_tests.py
+++ b/samza-test/src/main/python/tests/smoke_tests.py
@@ -15,37 +15,29 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import os
-import time
+import util
 import logging
-import socket
-import errno
-from kafka import KafkaClient, SimpleProducer, SimpleConsumer
 import zopkio.runtime as runtime
+from kafka import SimpleProducer, SimpleConsumer
 
 logger = logging.getLogger(__name__)
 
-CWD = os.path.dirname(os.path.abspath(__file__))
-HOME_DIR = os.path.join(CWD, os.pardir)
-DATA_DIR = os.path.join(HOME_DIR, 'data')
-TEST_TOPIC = 'samza-test-topic'
+DEPLOYER = 'samza_job_deployer'
+JOB_ID = 'negate_number'
+PACKAGE_ID = 'tests'
+CONFIG_FILE = 'config/negate-number.properties'
+TEST_INPUT_TOPIC = 'samza-test-topic'
 TEST_OUTPUT_TOPIC = 'samza-test-topic-output'
 NUM_MESSAGES = 50
 
 def test_samza_job():
   """
-  Sends 50 messages (1 .. 50) to samza-test-topic.
+  Runs a job that reads converts input strings to integers, negates the 
+  integer, and outputs to a Kafka topic.
   """
-  logger.info('Running test_samza_job')
-  kafka = _get_kafka_client()
-  kafka.ensure_topic_exists(TEST_TOPIC)
-  producer = SimpleProducer(kafka,
-    async=False,
-    req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
-    ack_timeout=30000)
-  for i in range(1, NUM_MESSAGES + 1):
-    producer.send_messages(TEST_TOPIC, str(i))
-  kafka.close()
+  _load_data()
+  util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE)
+  util.await_job(PACKAGE_ID, JOB_ID)
 
 def validate_samza_job():
   """
@@ -53,49 +45,28 @@ def validate_samza_job():
   samza-test-topic-output.
   """
   logger.info('Running validate_samza_job')
-  kafka = _get_kafka_client()
+  kafka = util.get_kafka_client()
   kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC)
   consumer = SimpleConsumer(kafka, 'samza-test-group', TEST_OUTPUT_TOPIC)
-  messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=60)
+  messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300)
   message_count = len(messages)
   assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found 
{1}'.format(NUM_MESSAGES, message_count)
   for message in map(lambda m: m.message.value, messages):
     assert int(message) < 0 , 'Expected negative integer but received 
{0}'.format(message)
   kafka.close()
 
-def _get_kafka_client(num_retries=20, retry_sleep=1):
-  """
-  Returns a KafkaClient based off of the kafka_hosts and kafka_port configs 
set 
-  in the active runtime.
-  """
-  kafka_hosts = runtime.get_active_config('kafka_hosts').values()
-  kafka_port = runtime.get_active_config('kafka_port')
-  assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts'
-  connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), 
kafka_hosts)).rstrip(',')
-  # wait for at least one broker to come up
-  if not _wait_for_server(kafka_hosts[0], kafka_port, 30):
-    raise Exception('Unable to connect to Kafka broker: 
{0}:{1}'.format(kafka_hosts[0], kafka_port))
-  return KafkaClient(connect_string)
-
-def _wait_for_server(host, port, timeout=5, retries=12):
+def _load_data():
   """
-  Keep trying to connect to a host port until the retry count has been reached.
+  Sends 50 messages (1 .. 50) to samza-test-topic.
   """
-  s = socket.socket()
-
-  for i in range(retries):
-    try:
-      s.settimeout(timeout)
-      s.connect((host, port))
-    except socket.timeout, err:
-      # Exception occurs if timeout is set. Wait and retry.
-      pass
-    except socket.error, err:
-      # Exception occurs if timeout > underlying network timeout. Wait and 
retry.
-      if type(err.args) != tuple or err[0] != errno.ETIMEDOUT:
-        raise
-    else:
-      s.close()
-      return True
-  return False
-
+  logger.info('Running test_samza_job')
+  kafka = util.get_kafka_client()
+  kafka.ensure_topic_exists(TEST_INPUT_TOPIC)
+  producer = SimpleProducer(
+    kafka,
+    async=False,
+    req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
+    ack_timeout=30000)
+  for i in range(1, NUM_MESSAGES + 1):
+    producer.send_messages(TEST_INPUT_TOPIC, str(i))
+  kafka.close()

http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/util.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/tests/util.py 
b/samza-test/src/main/python/tests/util.py
new file mode 100644
index 0000000..a0ed671
--- /dev/null
+++ b/samza-test/src/main/python/tests/util.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import socket
+import errno
+import zopkio.runtime as runtime
+from kafka import KafkaClient, SimpleProducer, SimpleConsumer
+from zopkio.runtime import get_active_config as c
+
+logger = logging.getLogger(__name__)
+
+DEPLOYER = 'samza_job_deployer'
+
+def start_job(package_id, job_id, config_file):
+  """
+  Start a Samza job.
+  """
+  logger.info('Starting {0}.{1}'.format(package_id, job_id))
+  samza_job_deployer = runtime.get_deployer(DEPLOYER)
+  samza_job_deployer.start(job_id, {
+    'package_id': package_id,
+    'config_file': config_file,
+  })
+
+def await_job(package_id, job_id):
+  """
+  Wait for a Samza job to finish.
+  """
+  logger.info('Awaiting {0}.{1}'.format(package_id, job_id))
+  samza_job_deployer = runtime.get_deployer(DEPLOYER)
+  samza_job_deployer.await(job_id, {
+    'package_id': package_id,
+  })
+
+def get_kafka_client(num_retries=20, retry_sleep=1):
+  """
+  Returns a KafkaClient based off of the kafka_hosts and kafka_port configs 
set 
+  in the active runtime.
+  """
+  kafka_hosts = runtime.get_active_config('kafka_hosts').values()
+  kafka_port = runtime.get_active_config('kafka_port')
+  assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts'
+  connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), 
kafka_hosts)).rstrip(',')
+  # wait for at least one broker to come up
+  if not wait_for_server(kafka_hosts[0], kafka_port, 30):
+    raise Exception('Unable to connect to Kafka broker: 
{0}:{1}'.format(kafka_hosts[0], kafka_port))
+  return KafkaClient(connect_string)
+
+def wait_for_server(host, port, timeout=5, retries=12):
+  """
+  Keep trying to connect to a host port until the retry count has been reached.
+  """
+  s = socket.socket()
+
+  for i in range(retries):
+    try:
+      s.settimeout(timeout)
+      s.connect((host, port))
+    except socket.timeout, err:
+      # Exception occurs if timeout is set. Wait and retry.
+      pass
+    except socket.error, err:
+      # Exception occurs if timeout > underlying network timeout. Wait and 
retry.
+      if type(err.args) != tuple or err[0] != errno.ETIMEDOUT:
+        raise
+    else:
+      s.close()
+      return True
+  return False

Reply via email to