Repository: samza Updated Branches: refs/heads/master c7ac26377 -> 41c74b968
SAMZA-548; add performance test for container with kafka consumer and producer Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/41c74b96 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/41c74b96 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/41c74b96 Branch: refs/heads/master Commit: 41c74b96876473acb6403e544bec7a00a04d2fa3 Parents: c7ac263 Author: Chris Riccomini <[email protected]> Authored: Fri Feb 13 17:10:03 2015 -0800 Committer: Chris Riccomini <[email protected]> Committed: Fri Feb 13 17:10:03 2015 -0800 ---------------------------------------------------------------------- samza-shell/src/main/bash/stat-yarn-job.sh | 21 +++++ .../src/main/config/negate-number.properties | 18 +---- .../kafka-read-write-performance.properties | 35 ++++++++ .../test/integration/NegateNumberTask.java | 44 +++++++++- .../src/main/python/configs/downloads.json | 2 +- samza-test/src/main/python/configs/kafka.json | 22 ++--- .../python/configs/smoke-tests/smoke-tests.json | 6 -- samza-test/src/main/python/configs/tests.json | 5 ++ samza-test/src/main/python/deployment.py | 21 ++--- .../src/main/python/samza_job_yarn_deployer.py | 47 ++++++++++- samza-test/src/main/python/tests.py | 3 +- .../src/main/python/tests/performance_tests.py | 80 +++++++++++++++++++ samza-test/src/main/python/tests/smoke_tests.py | 83 +++++++------------ samza-test/src/main/python/tests/util.py | 84 ++++++++++++++++++++ 14 files changed, 359 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-shell/src/main/bash/stat-yarn-job.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/stat-yarn-job.sh b/samza-shell/src/main/bash/stat-yarn-job.sh new file mode 100755 index 0000000..e5f6847 --- /dev/null +++ b/samza-shell/src/main/bash/stat-yarn-job.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +exec $(dirname $0)/run-class.sh org.apache.hadoop.yarn.client.cli.ApplicationCLI application -status "$@" http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/config/negate-number.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties index 379fa61..b9f898c 100644 --- a/samza-test/src/main/config/negate-number.properties +++ b/samza-test/src/main/config/negate-number.properties @@ -21,18 +21,12 @@ job.name=samza-negate-number # YARN yarn.container.count=1 -yarn.container.memory.mb=1024 # Task task.class=org.apache.samza.test.integration.NegateNumberTask task.inputs=kafka.samza-test-topic -task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory -task.checkpoint.replication.factor=1 -task.checkpoint.system=kafka -task.lifecycle.listener.generator.class=com.linkedin.samza.task.GeneratorLifecycleListenerFactory -task.lifecycle.listener.generator.fabric=CORP-EAT1 -task.opts=-Xmx6g -task.command.class=org.apache.samza.job.ShellCommandBuilder +task.max.messages=50 +task.outputs=kafka.samza-test-topic-output # Serializers serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory @@ -43,12 +37,4 @@ systems.kafka.samza.msg.serde=string systems.kafka.samza.key.serde=string systems.kafka.samza.offset.default=oldest systems.kafka.consumer.zookeeper.connect=localhost:2181/ -systems.kafka.producer.compression.type=gzip systems.kafka.producer.bootstrap.servers=localhost:9092 -systems.kafka.producer.acks=1 -systems.kafka.producer.metadata.max.age.ms=86400000 -# Normally, we'd set this much higher, but we want things to look snappy in the demo. -systems.kafka.producer.buffer.memory=1000000 - -# negate-number -streams.samza-test-topic.consumer.reset.offset=true http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/config/perf/kafka-read-write-performance.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/perf/kafka-read-write-performance.properties b/samza-test/src/main/config/perf/kafka-read-write-performance.properties new file mode 100644 index 0000000..122b14a --- /dev/null +++ b/samza-test/src/main/config/perf/kafka-read-write-performance.properties @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=kafka-read-write-performance + +# YARN +yarn.container.count=1 + +# Task +task.class=org.apache.samza.test.performance.TestPerformanceTask +task.inputs=kafka.kafka-read-write-performance-input +task.outputs=kafka.kafka-read-write-performance-output +task.max.messages=1000000 + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.offset.default=oldest +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java index 782e9f4..617cea6 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java @@ -19,22 +19,58 @@ package org.apache.samza.test.integration; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.task.TaskCoordinator.RequestScope; +import org.apache.samza.util.Util; -/* - * A simple test job that reads strings, converts them to integers, multiplies +/** + * A simple test job that reads strings, converts them to integers, multiplies * by -1, and outputs to "samza-test-topic-output" stream. */ -public class NegateNumberTask implements StreamTask { +public class NegateNumberTask implements StreamTask, InitableTask { + /** + * How many messages the all tasks in a single container have processed. + */ + private static int messagesProcessed = 0; + + /** + * How many messages to process before shutting down. + */ + private int maxMessages; + + /** + * The SystemStream to send negated numbers to. + */ + private SystemStream outputSystemStream; + + @Override + public void init(Config config, TaskContext context) throws Exception { + maxMessages = config.getInt("task.max.messages", 50); + String outputSystemStreamString = config.get("task.outputs", null); + if (outputSystemStreamString == null) { + throw new ConfigException("Missing required configuration: task.outputs"); + } + outputSystemStream = Util.getSystemStreamFromNames(outputSystemStreamString); + } + + @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { + messagesProcessed += 1; String input = (String) envelope.getMessage(); Integer number = Integer.valueOf(input); Integer output = number.intValue() * -1; - collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "samza-test-topic-output"), output.toString())); + collector.send(new OutgoingMessageEnvelope(outputSystemStream, output.toString())); + if (messagesProcessed >= maxMessages) { + coordinator.shutdown(RequestScope.ALL_TASKS_IN_CONTAINER); + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/downloads.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/downloads.json b/samza-test/src/main/python/configs/downloads.json index 8ded306..a75756f 100644 --- a/samza-test/src/main/python/configs/downloads.json +++ b/samza-test/src/main/python/configs/downloads.json @@ -1,5 +1,5 @@ { - "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz", + "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.9.2-0.8.2.0.tgz", "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz", "url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz" } http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/kafka.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json index 9a7af19..ab2f346 100644 --- a/samza-test/src/main/python/configs/kafka.json +++ b/samza-test/src/main/python/configs/kafka.json @@ -3,21 +3,21 @@ "kafka_instance_0": "localhost" }, "kafka_port": 9092, - "kafka_start_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.1.1/config/server.properties", - "kafka_stop_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh", + "kafka_start_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.2.0/config/server.properties", + "kafka_stop_cmd": "kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh", "kafka_install_path": "deploy/kafka", - "kafka_executable": "kafka_2.9.2-0.8.1.1.tgz", + "kafka_executable": "kafka_2.9.2-0.8.2.0.tgz", "kafka_post_install_cmds": [ - "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh", - "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.1.1/config/server.properties", - "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.1.1/config/server.properties" + "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.2.0/bin/kafka-server-stop.sh", + "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.2.0/config/server.properties", + "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.2.0/config/server.properties" ], "kafka_logs": [ "log-cleaner.log", - "kafka_2.9.2-0.8.1.1/logs/controller.log", - "kafka_2.9.2-0.8.1.1/logs/kafka-request.log", - "kafka_2.9.2-0.8.1.1/logs/kafkaServer-gc.log", - "kafka_2.9.2-0.8.1.1/logs/server.log", - "kafka_2.9.2-0.8.1.1/logs/state-change.log" + "kafka_2.9.2-0.8.2.0/logs/controller.log", + "kafka_2.9.2-0.8.2.0/logs/kafka-request.log", + "kafka_2.9.2-0.8.2.0/logs/kafkaServer-gc.log", + "kafka_2.9.2-0.8.2.0/logs/server.log", + "kafka_2.9.2-0.8.2.0/logs/state-change.log" ] } http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json b/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json deleted file mode 100644 index 65f8568..0000000 --- a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz", - "samza_install_path": "deploy/smoke_tests", - "samza_config_factory": "org.apache.samza.config.factories.PropertiesConfigFactory", - "samza_config_file": "config/negate-number.properties" -} http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/configs/tests.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/tests.json b/samza-test/src/main/python/configs/tests.json new file mode 100644 index 0000000..5251af9 --- /dev/null +++ b/samza-test/src/main/python/configs/tests.json @@ -0,0 +1,5 @@ +{ + "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz", + "samza_install_path": "deploy/smoke_tests", + "samza_config_factory": "org.apache.samza.config.factories.PropertiesConfigFactory" +} http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/deployment.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/deployment.py b/samza-test/src/main/python/deployment.py index a0e1481..89ba728 100644 --- a/samza-test/src/main/python/deployment.py +++ b/samza-test/src/main/python/deployment.py @@ -76,36 +76,25 @@ def setup_suite(): 'hostname': host }) - # Start the Samza jobs. + # Setup Samza job deployer. samza_job_deployer = SamzaJobYarnDeployer({ + 'config_factory': c('samza_config_factory'), 'yarn_site_template': c('yarn_site_template'), 'yarn_driver_configs': c('yarn_driver_configs'), 'yarn_nm_hosts': c('yarn_nm_hosts').values(), 'install_path': samza_install_path, }) - samza_job_deployer.install('smoke_tests', { + samza_job_deployer.install('tests', { 'executable': c('samza_executable'), }) - samza_job_deployer.start('negate_number', { - 'package_id': 'smoke_tests', - 'config_factory': c('samza_config_factory'), - 'config_file': c('samza_config_file'), - 'install_path': samza_install_path, - }) + runtime.set_deployer('samza_job_deployer', samza_job_deployer) def teardown_suite(): - # Stop the samza jobs. - samza_job_deployer.stop('negate_number', { - 'package_id': 'smoke_tests', - 'install_path': samza_install_path, - }) - - samza_job_deployer.uninstall('smoke_tests') + samza_job_deployer.uninstall('tests') # Undeploy everything. for name, deployer in deployers.iteritems(): for instance, host in c(name + '_hosts').iteritems(): deployer.undeploy(instance) - http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/samza_job_yarn_deployer.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/samza_job_yarn_deployer.py b/samza-test/src/main/python/samza_job_yarn_deployer.py index e18bc58..38635ca 100644 --- a/samza-test/src/main/python/samza_job_yarn_deployer.py +++ b/samza-test/src/main/python/samza_job_yarn_deployer.py @@ -39,7 +39,7 @@ class SamzaJobYarnDeployer(Deployer): to start and stop Samza jobs in a YARN grid. param: configs -- Map of config key/values pairs. These configs will be used - as a default whenever overrides are not provided in the methods (intall, + as a default whenever overrides are not provided in the methods (install, start, stop, etc) below. """ logging.getLogger("paramiko").setLevel(logging.ERROR) @@ -173,6 +173,47 @@ class SamzaJobYarnDeployer(Deployer): p.wait() assert p.returncode == 0, "Command returned non-zero exit code ({0}): {1}".format(p.returncode, command) + def await(self, job_id, configs={}): + """ + Waits for a Samza job to finish using bin/stat-yarn-job.sh. A job is + finished when its "Final State" is not "UNDEFINED". + + param: job_id -- A unique ID used to idenitfy a Samza job. + param: configs -- Map of config key/values pairs. Valid keys include: + + package_id: The package_id for the package that contains the code for job_id. + Usually, the package_id refers to the .tgz job tarball that contains the + code necessary to run job_id. + """ + configs = self._get_merged_configs(configs) + self._validate_configs(configs, ['package_id']) + + # Get configs. + package_id = configs.get('package_id') + + # Get the application_id for the job. + application_id = self.app_ids.get(job_id) + + # Stat the job, if it's been started, or WARN and return if it's hasn't. + final_state = 'UNDEFINED' + if not application_id: + logger.warn("Can't stat a job that was never started: {0}".format(job_id)) + else: + command = "{0} {1}".format(os.path.join(package_id, "bin/stat-yarn-job.sh"), application_id) + env = self._get_env_vars(package_id) + while final_state == 'UNDEFINED': + p = Popen(command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) + output, err = p.communicate() + logger.debug("Output from run-job.sh:\nstdout: {0}\nstderr: {1}".format(output, err)) + assert p.returncode == 0, "Command ({0}) returned non-zero exit code ({1}).\nstdout: {2}\nstderr: {3}".format(command, p.returncode, output, err) + + # Check the final state for the job. + regex = r'.*Final.State . (\w*)' + match = re.match(regex, output.replace("\n", ' ')) + final_state = match.group(1) + logger.debug("Got final state {0} for job_id {1}.".format(final_state, job_id)) + return final_state + def uninstall(self, package_id, configs={}): """ Removes the install path for package_id from all remote hosts that it's been @@ -201,6 +242,10 @@ class SamzaJobYarnDeployer(Deployer): # TODO we should implement the below helper methods over time, as we need them. + def get_processes(self): + # TODO raise NotImplementedError + return [] + def get_pid(self, container_id, configs={}): raise NotImplementedError http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/tests.py b/samza-test/src/main/python/tests.py index dae414e..df64e23 100644 --- a/samza-test/src/main/python/tests.py +++ b/samza-test/src/main/python/tests.py @@ -24,6 +24,7 @@ test = { 'perf_code': os.path.join(dir, 'perf.py'), 'configs_directory': os.path.join(dir, 'configs'), 'test_code': [ - os.path.join(dir, 'tests', 'smoke_tests.py') + os.path.join(dir, 'tests', 'smoke_tests.py'), + os.path.join(dir, 'tests', 'performance_tests.py'), ], } http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/performance_tests.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/tests/performance_tests.py b/samza-test/src/main/python/tests/performance_tests.py new file mode 100644 index 0000000..a97717f --- /dev/null +++ b/samza-test/src/main/python/tests/performance_tests.py @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import util +import logging +import zopkio.runtime as runtime +from kafka import SimpleProducer, SimpleConsumer + +logger = logging.getLogger(__name__) + +JOB_ID = 'kafka-read-write-performance' +PACKAGE_ID = 'tests' +CONFIG_FILE = 'config/perf/kafka-read-write-performance.properties' +TEST_INPUT_TOPIC = 'kafka-read-write-performance-input' +TEST_OUTPUT_TOPIC = 'kafka-read-write-performance-output' +NUM_MESSAGES = 1000000 +MESSAGE = 'a' * 200 + +def test_kafka_read_write_performance(): + """ + Runs a Samza job that reads from Kafka, and writes back out to it. The + writes/sec for the job is logged to the job's container. + """ + _load_data() + util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE) + util.await_job(PACKAGE_ID, JOB_ID) + +def validate_kafka_read_write_performance(): + """ + Validates that all messages were sent to the output topic. + """ + logger.info('Running validate_kafka_read_write_performance') + kafka = util.get_kafka_client() + kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC) + consumer = SimpleConsumer( + kafka, + 'samza-test-group', + TEST_OUTPUT_TOPIC, + fetch_size_bytes=1000000, + buffer_size=32768, + max_buffer_size=None) + # wait 5 minutes to get all million messages + messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300) + message_count = len(messages) + assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count) + kafka.close() + +def _load_data(): + """ + Sends 10 million messages to kafka-read-write-performance-input. + """ + logger.info('Running test_kafka_read_write_performance') + kafka = util.get_kafka_client() + kafka.ensure_topic_exists(TEST_INPUT_TOPIC) + producer = SimpleProducer( + kafka, + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, + ack_timeout=30000, + batch_send=True, + batch_send_every_n=200) + logger.info('Loading {0} test messages.'.format(NUM_MESSAGES)) + for i in range(0, NUM_MESSAGES): + if i % 100000 == 0: + logger.info('Loaded {0} messages.'.format(i)) + producer.send_messages(TEST_INPUT_TOPIC, MESSAGE) + kafka.close() http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/smoke_tests.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/tests/smoke_tests.py b/samza-test/src/main/python/tests/smoke_tests.py index 7aec4e0..53d5fa9 100644 --- a/samza-test/src/main/python/tests/smoke_tests.py +++ b/samza-test/src/main/python/tests/smoke_tests.py @@ -15,37 +15,29 @@ # specific language governing permissions and limitations # under the License. -import os -import time +import util import logging -import socket -import errno -from kafka import KafkaClient, SimpleProducer, SimpleConsumer import zopkio.runtime as runtime +from kafka import SimpleProducer, SimpleConsumer logger = logging.getLogger(__name__) -CWD = os.path.dirname(os.path.abspath(__file__)) -HOME_DIR = os.path.join(CWD, os.pardir) -DATA_DIR = os.path.join(HOME_DIR, 'data') -TEST_TOPIC = 'samza-test-topic' +DEPLOYER = 'samza_job_deployer' +JOB_ID = 'negate_number' +PACKAGE_ID = 'tests' +CONFIG_FILE = 'config/negate-number.properties' +TEST_INPUT_TOPIC = 'samza-test-topic' TEST_OUTPUT_TOPIC = 'samza-test-topic-output' NUM_MESSAGES = 50 def test_samza_job(): """ - Sends 50 messages (1 .. 50) to samza-test-topic. + Runs a job that reads converts input strings to integers, negates the + integer, and outputs to a Kafka topic. """ - logger.info('Running test_samza_job') - kafka = _get_kafka_client() - kafka.ensure_topic_exists(TEST_TOPIC) - producer = SimpleProducer(kafka, - async=False, - req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, - ack_timeout=30000) - for i in range(1, NUM_MESSAGES + 1): - producer.send_messages(TEST_TOPIC, str(i)) - kafka.close() + _load_data() + util.start_job(PACKAGE_ID, JOB_ID, CONFIG_FILE) + util.await_job(PACKAGE_ID, JOB_ID) def validate_samza_job(): """ @@ -53,49 +45,28 @@ def validate_samza_job(): samza-test-topic-output. """ logger.info('Running validate_samza_job') - kafka = _get_kafka_client() + kafka = util.get_kafka_client() kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC) consumer = SimpleConsumer(kafka, 'samza-test-group', TEST_OUTPUT_TOPIC) - messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=60) + messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=300) message_count = len(messages) assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count) for message in map(lambda m: m.message.value, messages): assert int(message) < 0 , 'Expected negative integer but received {0}'.format(message) kafka.close() -def _get_kafka_client(num_retries=20, retry_sleep=1): - """ - Returns a KafkaClient based off of the kafka_hosts and kafka_port configs set - in the active runtime. - """ - kafka_hosts = runtime.get_active_config('kafka_hosts').values() - kafka_port = runtime.get_active_config('kafka_port') - assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts' - connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), kafka_hosts)).rstrip(',') - # wait for at least one broker to come up - if not _wait_for_server(kafka_hosts[0], kafka_port, 30): - raise Exception('Unable to connect to Kafka broker: {0}:{1}'.format(kafka_hosts[0], kafka_port)) - return KafkaClient(connect_string) - -def _wait_for_server(host, port, timeout=5, retries=12): +def _load_data(): """ - Keep trying to connect to a host port until the retry count has been reached. + Sends 50 messages (1 .. 50) to samza-test-topic. """ - s = socket.socket() - - for i in range(retries): - try: - s.settimeout(timeout) - s.connect((host, port)) - except socket.timeout, err: - # Exception occurs if timeout is set. Wait and retry. - pass - except socket.error, err: - # Exception occurs if timeout > underlying network timeout. Wait and retry. - if type(err.args) != tuple or err[0] != errno.ETIMEDOUT: - raise - else: - s.close() - return True - return False - + logger.info('Running test_samza_job') + kafka = util.get_kafka_client() + kafka.ensure_topic_exists(TEST_INPUT_TOPIC) + producer = SimpleProducer( + kafka, + async=False, + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, + ack_timeout=30000) + for i in range(1, NUM_MESSAGES + 1): + producer.send_messages(TEST_INPUT_TOPIC, str(i)) + kafka.close() http://git-wip-us.apache.org/repos/asf/samza/blob/41c74b96/samza-test/src/main/python/tests/util.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/tests/util.py b/samza-test/src/main/python/tests/util.py new file mode 100644 index 0000000..a0ed671 --- /dev/null +++ b/samza-test/src/main/python/tests/util.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import logging +import socket +import errno +import zopkio.runtime as runtime +from kafka import KafkaClient, SimpleProducer, SimpleConsumer +from zopkio.runtime import get_active_config as c + +logger = logging.getLogger(__name__) + +DEPLOYER = 'samza_job_deployer' + +def start_job(package_id, job_id, config_file): + """ + Start a Samza job. + """ + logger.info('Starting {0}.{1}'.format(package_id, job_id)) + samza_job_deployer = runtime.get_deployer(DEPLOYER) + samza_job_deployer.start(job_id, { + 'package_id': package_id, + 'config_file': config_file, + }) + +def await_job(package_id, job_id): + """ + Wait for a Samza job to finish. + """ + logger.info('Awaiting {0}.{1}'.format(package_id, job_id)) + samza_job_deployer = runtime.get_deployer(DEPLOYER) + samza_job_deployer.await(job_id, { + 'package_id': package_id, + }) + +def get_kafka_client(num_retries=20, retry_sleep=1): + """ + Returns a KafkaClient based off of the kafka_hosts and kafka_port configs set + in the active runtime. + """ + kafka_hosts = runtime.get_active_config('kafka_hosts').values() + kafka_port = runtime.get_active_config('kafka_port') + assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts' + connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), kafka_hosts)).rstrip(',') + # wait for at least one broker to come up + if not wait_for_server(kafka_hosts[0], kafka_port, 30): + raise Exception('Unable to connect to Kafka broker: {0}:{1}'.format(kafka_hosts[0], kafka_port)) + return KafkaClient(connect_string) + +def wait_for_server(host, port, timeout=5, retries=12): + """ + Keep trying to connect to a host port until the retry count has been reached. + """ + s = socket.socket() + + for i in range(retries): + try: + s.settimeout(timeout) + s.connect((host, port)) + except socket.timeout, err: + # Exception occurs if timeout is set. Wait and retry. + pass + except socket.error, err: + # Exception occurs if timeout > underlying network timeout. Wait and retry. + if type(err.args) != tuple or err[0] != errno.ETIMEDOUT: + raise + else: + s.close() + return True + return False
