Repository: incubator-samza Updated Branches: refs/heads/master 0ebfcbd0a -> 6bbbaa597
SAMZA-468; create an integration test suite for samza. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/6bbbaa59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/6bbbaa59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/6bbbaa59 Branch: refs/heads/master Commit: 6bbbaa597612febaad082aa7cb3b8bf0ac9610e7 Parents: 0ebfcbd Author: Navina Ramesh <nram...@linkedin.com> Authored: Wed Jan 14 09:18:57 2015 -0800 Committer: Chris Riccomini <cricc...@criccomi-mn.linkedin.biz> Committed: Wed Jan 14 09:18:57 2015 -0800 ---------------------------------------------------------------------- .gitignore | 1 + README.md | 4 + bin/integration-tests.sh | 86 +++++++ build.gradle | 19 ++ docs/contribute/tests.md | 46 +++- .../src/main/config/hello-stateful-world.samsa | 31 +++ samza-test/src/main/config/join/checker.samsa | 36 +++ samza-test/src/main/config/join/emitter.samsa | 33 +++ samza-test/src/main/config/join/joiner.samsa | 31 +++ samza-test/src/main/config/join/watcher.samsa | 31 +++ .../src/main/config/negate-number.properties | 55 ++++ samza-test/src/main/config/perf/counter.samsa | 30 +++ .../src/main/config/perf/kv-perf.properties | 21 ++ .../test/integration/NegateNumberTask.java | 40 +++ .../src/main/python/configs/downloads.json | 5 + samza-test/src/main/python/configs/kafka.json | 23 ++ .../python/configs/smoke-tests/smoke-tests.json | 6 + samza-test/src/main/python/configs/yarn.json | 38 +++ .../src/main/python/configs/zookeeper.json | 16 ++ samza-test/src/main/python/deployment.py | 111 ++++++++ samza-test/src/main/python/perf.py | 50 ++++ samza-test/src/main/python/requirements.txt | 21 ++ .../src/main/python/samza_job_yarn_deployer.py | 255 +++++++++++++++++++ samza-test/src/main/python/templates.py | 17 ++ .../src/main/python/templates/yarn-site.xml | 27 ++ samza-test/src/main/python/tests.py | 29 +++ samza-test/src/main/python/tests/smoke_tests.py | 101 ++++++++ .../main/resources/hello-stateful-world.samsa | 31 --- .../src/main/resources/join/checker.samsa | 36 --- .../src/main/resources/join/emitter.samsa | 33 --- samza-test/src/main/resources/join/joiner.samsa | 31 --- .../src/main/resources/join/watcher.samsa | 31 --- .../src/main/resources/perf/counter.samsa | 30 --- .../src/main/resources/perf/kv-perf.properties | 21 -- 34 files changed, 1162 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index dfc6533..3e97416 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,4 @@ docs/learn/documentation/*/api/javadocs .DS_Store out/ *.patch +**.pyc http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 4453984..f37cc03 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,10 @@ To run key-value performance tests: ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/resources/perf/kv-perf.properties +To run all integration tests: + + ./bin/integration-tests.sh <dir> + ### Job Management To run a job (defined in a properties file): http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/bin/integration-tests.sh ---------------------------------------------------------------------- diff --git a/bin/integration-tests.sh b/bin/integration-tests.sh new file mode 100755 index 0000000..389479b --- /dev/null +++ b/bin/integration-tests.sh @@ -0,0 +1,86 @@ +#!/bin/bash -e +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +BASE_DIR=$DIR/.. +TEST_DIR=$1 + +if test -z "$TEST_DIR"; then + echo + echo " USAGE:" + echo + echo " ${BASH_SOURCE[0]##*/} \"<dirname to run tests in>\" [zopkio args...]" + echo + exit 0 +fi + +# always use absolute paths for ABS_TEST_DIR +ABS_TEST_DIR=$(cd $(dirname $TEST_DIR); pwd)/$(basename $TEST_DIR) +SCRIPTS_DIR=$ABS_TEST_DIR/scripts + +# safety check for virtualenv +if [ -f $HOME/.pydistutils.cfg ]; then + echo "Virtualenv can't run while $HOME/.pydistutils.cfg exists." + echo "Please remove $HOME/.pydistutils.cfg, and try again." + exit 0 +fi + +# build integration test tarball +./gradlew releaseTestJobs + +# create integration test directory +mkdir -p $ABS_TEST_DIR +rm -rf $SCRIPTS_DIR +cp -r samza-test/src/main/python/ $SCRIPTS_DIR +cp ./samza-test/build/distributions/samza-test*.tgz $ABS_TEST_DIR +cd $ABS_TEST_DIR + +# setup virtualenv locally if it's not already there +VIRTUAL_ENV=virtualenv-12.0.2 +if [[ ! -d "${ABS_TEST_DIR}/${VIRTUAL_ENV}" ]] ; then + curl -O https://pypi.python.org/packages/source/v/virtualenv/$VIRTUAL_ENV.tar.gz + tar xvfz $VIRTUAL_ENV.tar.gz +fi + +# build a clean virtual environment +SAMZA_INTEGRATION_TESTS_DIR=$ABS_TEST_DIR/samza-integration-tests +if [[ ! -d "${SAMZA_INTEGRATION_TESTS_DIR}" ]] ; then + python $VIRTUAL_ENV/virtualenv.py $SAMZA_INTEGRATION_TESTS_DIR +fi + +# activate the virtual environment +source $SAMZA_INTEGRATION_TESTS_DIR/bin/activate + +# install zopkio and requests +pip install -r $SCRIPTS_DIR/requirements.txt + +# treat all trailing parameters (after dirname) as zopkio switches +shift +SWITCHES="$*" + +# default to info-level debugging if not specified +if [[ $SWITCHES != *"console-log-level"* ]]; then + SWITCHES="$SWITCHES --console-log-level INFO" +fi + +# run the tests +zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/tests.py + +# go back to execution directory +deactivate +cd $DIR http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 38383bd..7a40ad4 100644 --- a/build.gradle +++ b/build.gradle @@ -206,6 +206,8 @@ project(":samza-yarn_$scalaVersion") { compile("org.apache.hadoop:hadoop-common:$yarnVersion") { exclude module: 'slf4j-log4j12' exclude module: 'servlet-api' + // Exclude because YARN's 3.4.5 ZK version is incompatbile with Kafka's 3.3.4. + exclude module: 'zookeeper' } compile("org.scalatra:scalatra_$scalaVersion:$scalatraVersion") { exclude module: 'scala-compiler' @@ -216,6 +218,7 @@ project(":samza-yarn_$scalaVersion") { exclude module: 'slf4j-api' } compile "joda-time:joda-time:$jodaTimeVersion" + compile "org.apache.zookeeper:zookeeper:$zookeeperVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } @@ -362,6 +365,9 @@ project(":samza-test_$scalaVersion") { compile project(":samza-kv-leveldb_$scalaVersion") compile project(":samza-kv-rocksdb_$scalaVersion") compile project(":samza-core_$scalaVersion") + runtime project(":samza-log4j") + runtime project(":samza-yarn_$scalaVersion") + runtime project(":samza-kafka_$scalaVersion") compile "org.scala-lang:scala-library:$scalaLibVersion" compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "javax.mail:mail:1.4" @@ -386,4 +392,17 @@ project(":samza-test_$scalaVersion") { // useful for configuring TestSamzaContainerPerformance from the CLI. systemProperties = System.properties.findAll { it.key.startsWith("samza") } } + + tasks.create(name: "releaseTestJobs", dependsOn: configurations.archives.artifacts, type: Tar) { + compression = Compression.GZIP + from(file("$projectDir/src/main/config")) { into "config/" } + from(file("$projectDir/src/main/resources")) { into "lib/" } + from(project(':samza-shell').file("src/main/bash")) { into "bin/" } + from(project(':samza-shell').file("src/main/resources")) { into "lib/" } + from(project(':samza-shell').file("src/main/resources/log4j-console.xml")) { into "bin/" } + from '../LICENSE' + from '../NOTICE' + from(configurations.runtime) { into("lib/") } + from(configurations.archives.artifacts.files) { into("lib/") } + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/docs/contribute/tests.md ---------------------------------------------------------------------- diff --git a/docs/contribute/tests.md b/docs/contribute/tests.md index 84e1dbd..237d9f7 100644 --- a/docs/contribute/tests.md +++ b/docs/contribute/tests.md @@ -19,7 +19,7 @@ title: Tests limitations under the License. --> -Samza's unit tests are written on top of [JUnit](http://junit.org/), and license checking is done with [Apache Rat](http://creadur.apache.org/rat/). An extensive integration test suite is not currently available. This is being actively worked on in [SAMZA-6](https://issues.apache.org/jira/browse/SAMZA-6) and [SAMZA-14](https://issues.apache.org/jira/browse/SAMZA-14). +Samza's unit tests are written on top of [JUnit](http://junit.org/), and license checking is done with [Apache Rat](http://creadur.apache.org/rat/). Samza's integration tests are written on top of [Zopkio](https://github.com/linkedin/Zopkio). ### Running Unit Tests Locally @@ -67,3 +67,47 @@ On Mac, check-all.sh will default to the appropriate path for each environment v [Travis CI](https://travis-ci.org/apache/incubator-samza) has been configured to run Samza's unit tests after every commit to Samza's [master branch](https://git-wip-us.apache.org/repos/asf?p=incubator-samza.git;a=tree). The test results are mailed to the [developer mailing list](/community/mailing-lists.html), and posted in the [IRC channel](/community/irc.html). [](https://travis-ci.org/apache/incubator-samza) + +### Running Integration Tests Locally + +Samza uses [Zopkio](https://github.com/linkedin/Zopkio) to deploy and execute its integration tests. Integration tests can be executed by running: + + ./bin/integration-tests.sh /tmp/samza-tests + +The parameter defines where the integration tests should install packages both locally and on remote systems. Executing this command will: + +1. Build a samza-test job tarball. +2. Download and install YARN, Kafka, and ZooKeeper. +3. Deploy the samza-test job tarball to all YARN NM machines. +4. Start all Samza integration test jobs. +5. Feed input data to the jobs, and verify the results. +6. Open a report, and aggregate all remote logs. + +The default configurations that ship with Samza deploy all software, and run all tests locally on the machine from which the `integration-tests.sh` command was executed. + +The integration tests use SSH to interact with remote machines (and localhost). This means that you need an authentication mechanism when connecting to the machines. The two authentication mechanisms provided are: + +1. Interactive +2. Public key + +#### Interactive + +Zopkio will prompt you for a password by default. This password will be used as the SSH password when trying to log into remote systems. + +#### Public Key + +Zopkio supports public key authentication if you prefer to use it, or if your environment doesn't allow interactive authentication. To use public key authentication, add your public SSH key to ~/.ssh/authorized\_keys, and SSH to all of the machines that you'll be deploying to (localhost by default). See [here](http://www.linuxproblem.org/art_9.html) for details. + +Once this is done, you can run Zopkio with the \-\-nopassword parameter: + + ./bin/integration-tests.sh /tmp/samza-tests --nopassword + +This will skip the password prompt, and force Zopkio to try public key authentication. + +#### Console Logging + +The integration-tests.sh script will set the console log level to INFO by default. The level can be changed with: + + ./bin/integration-tests.sh /tmp/samza-tests --console-log-level DEBUG + +Changing this setting will define how verbose Zopkio is during test execution. It does not affect any of the log4j.xml settings in Samza, YARN, Kafka, or ZooKeeper. http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/config/hello-stateful-world.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/hello-stateful-world.samsa b/samza-test/src/main/config/hello-stateful-world.samsa new file mode 100644 index 0000000..745f881 --- /dev/null +++ b/samza-test/src/main/config/hello-stateful-world.samsa @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=samza.job.local.ThreadJobFactory +job.name=hello-stateful-world + +# Task +task.class=samza.test.integration.SimpleStatefulTask +task.inputs=kafka.input + +# Stores +stores.mystore.factory=samza.storage.kv.KeyValueStorageEngineFactory +stores.mystore.key.serde=string +stores.mystore.msg.serde=string +stores.mystore.changelog=kafka.mystore http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/config/join/checker.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/join/checker.samsa b/samza-test/src/main/config/join/checker.samsa new file mode 100644 index 0000000..6a6b9cd --- /dev/null +++ b/samza-test/src/main/config/join/checker.samsa @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.name=checker + +systems.kafka.partitioner.class=samza.test.integration.join.EpochPartitioner + +# Task +task.class=samza.test.integration.join.Checker +task.inputs=kafka.completed-keys + +stores.checker-state.factory=samza.storage.kv.KeyValueStorageEngineFactory +stores.checker-state.key.serde=string +stores.checker-state.msg.serde=string +stores.checker-state.changelog=kafka.checker-state + +task.window.ms=300000 + +num.partitions=4 +expected.keys=100000 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/config/join/emitter.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/join/emitter.samsa b/samza-test/src/main/config/join/emitter.samsa new file mode 100644 index 0000000..5e94322 --- /dev/null +++ b/samza-test/src/main/config/join/emitter.samsa @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.name=emitter + +# Task +task.class=samza.test.integration.join.Emitter +task.inputs=kafka.epoch + +stores.emitter-state.factory=samza.storage.kv.KeyValueStorageEngineFactory +stores.emitter-state.key.serde=string +stores.emitter-state.msg.serde=string +stores.emitter-state.changelog=kafka.emitter-state + +task.window.ms=0 + +count=100000 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/config/join/joiner.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/join/joiner.samsa b/samza-test/src/main/config/join/joiner.samsa new file mode 100644 index 0000000..4ecee2b --- /dev/null +++ b/samza-test/src/main/config/join/joiner.samsa @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.name=joiner + +# Task +task.class=samza.test.integration.join.Joiner +task.inputs=kafka.emitted + +stores.joiner-state.factory=samza.storage.kv.KeyValueStorageEngineFactory +stores.joiner-state.key.serde=string +stores.joiner-state.msg.serde=string +stores.joiner-state.changelog=kafka.checker-state + +num.partitions=4 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/config/join/watcher.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/join/watcher.samsa b/samza-test/src/main/config/join/watcher.samsa new file mode 100644 index 0000000..025f055 --- /dev/null +++ b/samza-test/src/main/config/join/watcher.samsa @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.name=watcher + +# Task +task.class=samza.test.integration.join.Joiner +task.inputs=kafka.epoch + +task.window.ms=300000 + +max.time.between.epochs.ms=600000 +mail.smtp.host=TODO +mail.to=d...@samza.incubator.apache.org +mail.from=gre...@incubator.apache.org http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/config/negate-number.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties new file mode 100644 index 0000000..4989b27 --- /dev/null +++ b/samza-test/src/main/config/negate-number.properties @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=samza-negate-number + +# YARN +yarn.container.count=1 +yarn.container.memory.mb=1024 + +# Task +task.class=org.apache.samza.test.integration.NegateNumberTask +task.inputs=kafka.samza-test-topic +task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory +task.checkpoint.replication.factor=1 +task.checkpoint.system=kafka +task.lifecycle.listener.generator.class=com.linkedin.samza.task.GeneratorLifecycleListenerFactory +task.lifecycle.listener.generator.fabric=CORP-EAT1 +task.opts=-Xmx6g +task.command.class=org.apache.samza.job.ShellCommandBuilder + +# Serializers +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=string +systems.kafka.samza.key.serde=string +systems.kafka.samza.offset.default=oldest +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.compression.codec=gzip +systems.kafka.producer.metadata.broker.list=localhost:9092 +systems.kafka.producer.request.required.acks=1 +systems.kafka.producer.topic.metadata.refresh.interval.ms=86400000 +systems.kafka.producer.producer.type=sync +# Normally, we'd set this much higher, but we want things to look snappy in the demo. +systems.kafka.producer.batch.num.messages=1 + +# negate-number +streams.samza-test-topic.consumer.reset.offset=true http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/config/perf/counter.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/perf/counter.samsa b/samza-test/src/main/config/perf/counter.samsa new file mode 100644 index 0000000..6e80a22 --- /dev/null +++ b/samza-test/src/main/config/perf/counter.samsa @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.name=counter-task + +# Task +task.class=samza.test.integration.StatePerfTestTask +task.inputs=kafka.input + +# Stores +stores.mystore.factory=samza.storage.kv.KeyValueStorageEngineFactory +stores.mystore.key.serde=string +stores.mystore.msg.serde=string +stores.mystore.changelog=kafka.mystore http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/config/perf/kv-perf.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/config/perf/kv-perf.properties b/samza-test/src/main/config/perf/kv-perf.properties new file mode 100644 index 0000000..dcc223f --- /dev/null +++ b/samza-test/src/main/config/perf/kv-perf.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +stores.test.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory +stores.test.compaction.delete.threshold=1000 +test.partition.count=4 +test.num.loops=1000 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java new file mode 100644 index 0000000..782e9f4 --- /dev/null +++ b/samza-test/src/main/java/org/apache/samza/test/integration/NegateNumberTask.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.integration; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskCoordinator; + +/* + * A simple test job that reads strings, converts them to integers, multiplies + * by -1, and outputs to "samza-test-topic-output" stream. + */ +public class NegateNumberTask implements StreamTask { + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { + String input = (String) envelope.getMessage(); + Integer number = Integer.valueOf(input); + Integer output = number.intValue() * -1; + collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "samza-test-topic-output"), output.toString())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/configs/downloads.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/downloads.json b/samza-test/src/main/python/configs/downloads.json new file mode 100644 index 0000000..8ded306 --- /dev/null +++ b/samza-test/src/main/python/configs/downloads.json @@ -0,0 +1,5 @@ +{ + "url_kafka": "http://www.us.apache.org/dist/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz", + "url_zookeeper": "http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz", + "url_hadoop": "https://archive.apache.org/dist/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz" +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/configs/kafka.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/kafka.json b/samza-test/src/main/python/configs/kafka.json new file mode 100644 index 0000000..9a7af19 --- /dev/null +++ b/samza-test/src/main/python/configs/kafka.json @@ -0,0 +1,23 @@ +{ + "kafka_hosts": { + "kafka_instance_0": "localhost" + }, + "kafka_port": 9092, + "kafka_start_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh -daemon kafka_2.9.2-0.8.1.1/config/server.properties", + "kafka_stop_cmd": "kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh", + "kafka_install_path": "deploy/kafka", + "kafka_executable": "kafka_2.9.2-0.8.1.1.tgz", + "kafka_post_install_cmds": [ + "sed -i.bak 's/SIGINT/SIGTERM/g' kafka_2.9.2-0.8.1.1/bin/kafka-server-stop.sh", + "sed -i.bak 's/^num\\.partitions *=.*/num.partitions=1/' kafka_2.9.2-0.8.1.1/config/server.properties", + "sed -i.bak 's/.*log.dirs.*/log.dirs=data/g' kafka_2.9.2-0.8.1.1/config/server.properties" + ], + "kafka_logs": [ + "log-cleaner.log", + "kafka_2.9.2-0.8.1.1/logs/controller.log", + "kafka_2.9.2-0.8.1.1/logs/kafka-request.log", + "kafka_2.9.2-0.8.1.1/logs/kafkaServer-gc.log", + "kafka_2.9.2-0.8.1.1/logs/server.log", + "kafka_2.9.2-0.8.1.1/logs/state-change.log" + ] +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json b/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json new file mode 100644 index 0000000..65f8568 --- /dev/null +++ b/samza-test/src/main/python/configs/smoke-tests/smoke-tests.json @@ -0,0 +1,6 @@ +{ + "samza_executable": "samza-test_2.10-0.9.0-SNAPSHOT.tgz", + "samza_install_path": "deploy/smoke_tests", + "samza_config_factory": "org.apache.samza.config.factories.PropertiesConfigFactory", + "samza_config_file": "config/negate-number.properties" +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/configs/yarn.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/yarn.json b/samza-test/src/main/python/configs/yarn.json new file mode 100644 index 0000000..dc9a58e --- /dev/null +++ b/samza-test/src/main/python/configs/yarn.json @@ -0,0 +1,38 @@ +{ + "yarn_site_template": "scripts/templates/yarn-site.xml", + "yarn_rm_hosts": { + "yarn_rm_instance_0": "localhost" + }, + "yarn_rm_start_cmd": "hadoop-2.4.0/sbin/yarn-daemon.sh start resourcemanager", + "yarn_rm_stop_cmd": "hadoop-2.4.0/sbin/yarn-daemon.sh stop resourcemanager", + "yarn_rm_install_path": "deploy/yarn_rm", + "yarn_rm_post_install_cmds": [ + "sed -i.bak '/<configuration>/a <property><name>yarn.nodemanager.vmem-pmem-ratio</name><value>10</value></property>' hadoop-2.4.0/etc/hadoop/yarn-site.xml", + "mkdir -p hadoop-2.4.0/conf", + "chmod 755 hadoop-2.4.0/conf", + "cp hadoop-2.4.0/etc/hadoop/yarn-site.xml hadoop-2.4.0/conf/yarn-site.xml" + ], + "yarn_rm_executable": "hadoop-2.4.0.tar.gz", + "yarn_rm_logs": [ + "hadoop-2.4.0/logs" + ], + "yarn_nm_hosts": { + "yarn_nm_instance_0": "localhost" + }, + "yarn_nm_start_cmd": "hadoop-2.4.0/sbin/yarn-daemon.sh start nodemanager", + "yarn_nm_stop_cmd": "hadoop-2.4.0/sbin/yarn-daemon.sh stop nodemanager", + "yarn_nm_install_path": "deploy/yarn_nm", + "yarn_nm_post_install_cmds": [ + "sed -i.bak '/<configuration>/a <property><name>yarn.nodemanager.vmem-pmem-ratio</name><value>10</value></property>' hadoop-2.4.0/etc/hadoop/yarn-site.xml", + "mkdir -p hadoop-2.4.0/conf", + "chmod 755 hadoop-2.4.0/conf", + "cp hadoop-2.4.0/etc/hadoop/yarn-site.xml hadoop-2.4.0/conf/yarn-site.xml" + ], + "yarn_nm_executable": "hadoop-2.4.0.tar.gz", + "yarn_nm_logs": [ + "hadoop-2.4.0/logs" + ], + "yarn_driver_configs": { + "yarn.resourcemanager.hostname": "localhost" + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/configs/zookeeper.json ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/configs/zookeeper.json b/samza-test/src/main/python/configs/zookeeper.json new file mode 100644 index 0000000..2762197 --- /dev/null +++ b/samza-test/src/main/python/configs/zookeeper.json @@ -0,0 +1,16 @@ +{ + "zookeeper_hosts": { + "zookeeper_instance_0": "localhost" + }, + "zookeeper_start_cmd": "zookeeper-3.4.3/bin/zkServer.sh start", + "zookeeper_stop_cmd": "zookeeper-3.4.3/bin/zkServer.sh stop", + "zookeeper_install_path": "deploy/zookeeper", + "zookeeper_executable": "zookeeper-3.4.3.tar.gz", + "zookeeper_post_install_cmds": [ + "cp zookeeper-3.4.3/conf/zoo_sample.cfg zookeeper-3.4.3/conf/zoo.cfg", + "sed -i.bak 's/.*dataDir=.*/dataDir=data/g' zookeeper-3.4.3/conf/zoo.cfg" + ], + "zookeeper_logs": [ + "zookeeper.out" + ] +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/deployment.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/deployment.py b/samza-test/src/main/python/deployment.py new file mode 100644 index 0000000..a0e1481 --- /dev/null +++ b/samza-test/src/main/python/deployment.py @@ -0,0 +1,111 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import logging +import shutil +import urllib +import zopkio.runtime as runtime +import zopkio.adhoc_deployer as adhoc_deployer +from zopkio.runtime import get_active_config as c +from samza_job_yarn_deployer import SamzaJobYarnDeployer + +logger = logging.getLogger(__name__) +deployers = None +samza_job_deployer = None +samza_install_path = None + +def _download_packages(): + for url_key in ['url_hadoop', 'url_kafka', 'url_zookeeper']: + logger.debug('Getting download URL for: {0}'.format(url_key)) + url = c(url_key) + filename = os.path.basename(url) + if os.path.exists(filename): + logger.debug('Using cached file: {0}'.format(filename)) + else: + logger.info('Downloading: {0}'.format(url)) + urllib.urlretrieve(url, filename) + +def _new_ssh_deployer(config_prefix, name=None): + deployer_name = config_prefix if name == None else name + return adhoc_deployer.SSHDeployer(deployer_name, { + 'install_path': os.path.join(c('remote_install_path'), c(config_prefix + '_install_path')), + 'executable': c(config_prefix + '_executable'), + 'post_install_cmds': c(config_prefix + '_post_install_cmds', []), + 'start_command': c(config_prefix + '_start_cmd'), + 'stop_command': c(config_prefix + '_stop_cmd'), + 'extract': True, + 'sync': True, + }) + +def setup_suite(): + global deployers, samza_job_deployer, samza_install_path + logger.info('Current working directory: {0}'.format(os.getcwd())) + samza_install_path = os.path.join(c('remote_install_path'), c('samza_install_path')) + + _download_packages() + + deployers = { + 'zookeeper': _new_ssh_deployer('zookeeper'), + 'yarn_rm': _new_ssh_deployer('yarn_rm'), + 'yarn_nm': _new_ssh_deployer('yarn_nm'), + 'kafka': _new_ssh_deployer('kafka'), + } + + # Enforce install order through list. + for name in ['zookeeper', 'yarn_rm', 'yarn_nm', 'kafka']: + deployer = deployers[name] + runtime.set_deployer(name, deployer) + for instance, host in c(name + '_hosts').iteritems(): + logger.info('Deploying {0} on host: {1}'.format(instance, host)) + deployer.deploy(instance, { + 'hostname': host + }) + + # Start the Samza jobs. + samza_job_deployer = SamzaJobYarnDeployer({ + 'yarn_site_template': c('yarn_site_template'), + 'yarn_driver_configs': c('yarn_driver_configs'), + 'yarn_nm_hosts': c('yarn_nm_hosts').values(), + 'install_path': samza_install_path, + }) + + samza_job_deployer.install('smoke_tests', { + 'executable': c('samza_executable'), + }) + + samza_job_deployer.start('negate_number', { + 'package_id': 'smoke_tests', + 'config_factory': c('samza_config_factory'), + 'config_file': c('samza_config_file'), + 'install_path': samza_install_path, + }) + +def teardown_suite(): + # Stop the samza jobs. + samza_job_deployer.stop('negate_number', { + 'package_id': 'smoke_tests', + 'install_path': samza_install_path, + }) + + samza_job_deployer.uninstall('smoke_tests') + + # Undeploy everything. + for name, deployer in deployers.iteritems(): + for instance, host in c(name + '_hosts').iteritems(): + deployer.undeploy(instance) + http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/perf.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/perf.py b/samza-test/src/main/python/perf.py new file mode 100644 index 0000000..144cf58 --- /dev/null +++ b/samza-test/src/main/python/perf.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +from zopkio.runtime import get_active_config as c + +LOGS_DIRECTORY = 'logs' +OUTPUT_DIRECTORY = 'output' + +def machine_logs(): + log_paths = {} + + # Attach proper path to all logs. + for config_prefix in ['zookeeper', 'yarn_rm', 'yarn_nm', 'kafka']: + deployed_path = os.path.join(c('remote_install_path'), c(config_prefix + '_install_path')) + relative_log_paths = c(config_prefix + '_logs') + log_paths[config_prefix] = map(lambda l: os.path.join(deployed_path, l), relative_log_paths) + + return { + 'zookeeper_instance_0': log_paths['zookeeper'], + 'kafka_instance_0': log_paths['kafka'], + 'yarn_rm_instance_0': log_paths['yarn_rm'], + 'yarn_nm_instance_0': log_paths['yarn_nm'], + } + +def naarad_logs(): + return { + 'zookeeper_instance_0': [], + 'kafka_instance_0': [], + 'samza_instance_0': [], + 'yarn_rm_instance_0': [], + 'yarn_nm_instance_0': [], + } + +def naarad_config(config, test_name=None): + return os.path.join(os.path.dirname(os.path.abspath(__file__)), 'naarad.cfg') http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/requirements.txt ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/requirements.txt b/samza-test/src/main/python/requirements.txt new file mode 100644 index 0000000..2ae9590 --- /dev/null +++ b/samza-test/src/main/python/requirements.txt @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +zopkio +requests +kafka-python +Jinja2 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/samza_job_yarn_deployer.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/samza_job_yarn_deployer.py b/samza-test/src/main/python/samza_job_yarn_deployer.py new file mode 100644 index 0000000..ad73677 --- /dev/null +++ b/samza-test/src/main/python/samza_job_yarn_deployer.py @@ -0,0 +1,255 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import re +import logging +import json +import requests +import shutil +import tarfile +import zopkio.constants as constants +import templates + +from subprocess import PIPE, Popen +from zopkio.deployer import Deployer, Process +from zopkio.remote_host_helper import better_exec_command, DeploymentError, get_sftp_client, get_ssh_client, open_remote_file + +logger = logging.getLogger(__name__) + +class SamzaJobYarnDeployer(Deployer): + def __init__(self, configs={}): + """ + Instantiates a Samza job deployer that uses run-job.sh and kill-yarn-job.sh + to start and stop Samza jobs in a YARN grid. + + param: configs -- Map of config key/values pairs. These configs will be used + as a default whenever overrides are not provided in the methods (intall, + start, stop, etc) below. + """ + logging.getLogger("paramiko").setLevel(logging.ERROR) + # map from job_id to app_id + self.app_ids = {} + self.default_configs = configs + Deployer.__init__(self) + + def install(self, package_id, configs={}): + """ + Installs a package (tarball, or zip) on to a list of remote hosts by + SFTP'ing the package to the remote install_path. + + param: package_id -- A unique ID used to identify an installed YARN package. + param: configs -- Map of config key/values pairs. Valid keys include: + + yarn_site_template: Jinja2 yarn-site.xml template local path. + yarn_driver_configs: Key/value pairs to be injected into the yarn-site.xml template. + yarn_nm_hosts: A list of YARN NM hosts to install the package onto. + install_path: An absolute path where the package will be installed. + executable: A local path pointing to the location of the package that should be installed on remote hosts. + """ + configs = self._get_merged_configs(configs) + self._validate_configs(configs, ['yarn_site_template', 'yarn_driver_configs', 'yarn_nm_hosts', 'install_path', 'executable']) + + # Get configs. + nm_hosts = configs.get('yarn_nm_hosts') + install_path = configs.get('install_path') + executable = configs.get('executable') + + # FTP and decompress job tarball to all NMs. + exec_file_location = os.path.join(install_path, self._get_package_tgz_name(package_id)) + exec_file_install_path = os.path.join(install_path, package_id) + for host in nm_hosts: + logger.info('Deploying {0} on host: {1}'.format(package_id, host)) + with get_ssh_client(host) as ssh: + better_exec_command(ssh, "mkdir -p {0}".format(install_path), "Failed to create path: {0}".format(install_path)) + with get_sftp_client(host) as ftp: + def progress(transferred_bytes, total_bytes_to_transfer): + logger.debug("{0} of {1} bytes transferred.".format(transferred_bytes, total_bytes_to_transfer)) + ftp.put(executable, exec_file_location, callback=progress) + + # Extract archive locally so we can use run-job.sh. + executable_tgz = tarfile.open(executable, 'r:gz') + executable_tgz.extractall(package_id) + + # Generate yarn-site.xml install it in package's local 'config' directory. + yarn_site_dir = self._get_yarn_conf_dir(package_id) + yarn_site_path = os.path.join(yarn_site_dir, 'yarn-site.xml') + logger.info("Installing yarn-site.xml to {0}".format(yarn_site_path)) + if not os.path.exists(yarn_site_dir): + os.makedirs(yarn_site_dir) + templates.render_config(configs.get('yarn_site_template'), yarn_site_path, configs.get('yarn_driver_configs')) + + def start(self, job_id, configs={}): + """ + Starts a Samza job using the bin/run-job.sh script. + + param: job_id -- A unique ID used to idenitfy a Samza job. Job IDs are associated + with a package_id, and a config file. + param: configs -- Map of config key/values pairs. Valid keys include: + + package_id: The package_id for the package that contains the code for job_id. + Usually, the package_id refers to the .tgz job tarball that contains the + code necessary to run job_id. + config_factory: The config factory to use to decode the config_file. + config_file: Path to the config file for the job to be run. + install_path: Path where the package for the job has been installed on remote NMs. + properties: (optional) [(property-name,property-value)] Optional override + properties for the run-job.sh script. These properties override the + config_file's properties. + """ + configs = self._get_merged_configs(configs) + self._validate_configs(configs, ['package_id', 'config_factory', 'config_file', 'install_path']) + + # Get configs. + package_id = configs.get('package_id') + config_factory = configs.get('config_factory') + config_file = configs.get('config_file') + install_path = configs.get('install_path') + properties = configs.get('properties', {}) + properties['yarn.package.path'] = 'file:' + os.path.join(install_path, self._get_package_tgz_name(package_id)) + + # Execute bin/run-job.sh locally from driver machine. + command = "{0} --config-factory={1} --config-path={2}".format(os.path.join(package_id, "bin/run-job.sh"), config_factory, os.path.join(package_id, config_file)) + env = self._get_env_vars(package_id) + for property_name, property_value in properties.iteritems(): + command += " --config {0}={1}".format(property_name, property_value) + p = Popen(command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) + output, err = p.communicate() + logger.debug("Output from run-job.sh:\nstdout: {0}\nstderr: {1}".format(output, err)) + assert p.returncode == 0, "Command ({0}) returned non-zero exit code ({1}).\nstdout: {2}\nstderr: {3}".format(command, p.returncode, output, err) + + # Save application_id for job_id so we can kill the job later. + regex = r'.*Submitted application (\w*)' + match = re.match(regex, output.replace("\n", ' ')) + assert match, "Job ({0}) appears not to have started. Expected to see a log line matching regex: {1}".format(job_id, regex) + app_id = match.group(1) + logger.debug("Got application_id {0} for job_id {1}.".format(app_id, job_id)) + self.app_ids[job_id] = app_id + + def stop(self, job_id, configs={}): + """ + Stops a Samza job using the bin/kill-yarn-job.sh script. + + param: job_id -- A unique ID used to idenitfy a Samza job. + param: configs -- Map of config key/values pairs. Valid keys include: + + package_id: The package_id for the package that contains the code for job_id. + Usually, the package_id refers to the .tgz job tarball that contains the + code necessary to run job_id. + """ + configs = self._get_merged_configs(configs) + self._validate_configs(configs, ['package_id']) + + # Get configs. + package_id = configs.get('package_id') + + # Get the application_id for the job. + application_id = self.app_ids.get(job_id) + + # Kill the job, if it's been started, or WARN and return if it's hasn't. + if not application_id: + logger.warn("Can't stop a job that was never started: {0}".format(job_id)) + else: + command = "{0} {1}".format(os.path.join(package_id, "bin/kill-yarn-job.sh"), application_id) + env = self._get_env_vars(package_id) + p = Popen(command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env) + p.wait() + assert p.returncode == 0, "Command returned non-zero exit code ({0}): {1}".format(p.returncode, command) + + def uninstall(self, package_id, configs={}): + """ + Removes the install path for package_id from all remote hosts that it's been + installed on. + + param: package_id -- A unique ID used to identify an installed YARN package. + param: configs -- Map of config key/values pairs. Valid keys include: + + yarn_nm_hosts: A list of hosts that package was installed on. + install_path: Path where the package for the job has been installed on remote NMs. + """ + configs = self._get_merged_configs(configs) + self._validate_configs(configs, ['yarn_nm_hosts', 'install_path']) + + # Get configs. + nm_hosts = configs.get('yarn_nm_hosts') + install_path = configs.get('install_path') + + # Delete job package on all NMs. + for host in nm_hosts: + with get_ssh_client(host) as ssh: + better_exec_command(ssh, "rm -rf {0}".format(install_path), "Failed to remove {0}".format(install_path)) + + # Delete job pacakge directory from local driver box. + shutil.rmtree(package_id) + + # TODO we should implement the below helper methods over time, as we need them. + + def get_pid(self, container_id, configs={}): + raise NotImplementedError + + def get_host(self, container_id): + raise NotImplementedError + + def get_containers(self, job_id): + raise NotImplementedError + + def get_jobs(self): + raise NotImplementedError + + def sleep(self, container_id, delay, configs={}): + raise NotImplementedError + + def pause(self, container_id, configs={}): + raise NotImplementedError + + def resume(self, container_id, configs={}): + raise NotImplementedError + + def kill(self, container_id, configs={}): + raise NotImplementedError + + def terminate(self, container_id, configs={}): + raise NotImplementedError + + def get_logs(self, container_id, logs, directory): + raise NotImplementedError + + def _validate_configs(self, configs, config_keys): + for required_config in config_keys: + assert configs.get(required_config), 'Required config is undefined: {0}'.format(required_config) + + def _get_merged_configs(self, configs): + tmp = self.default_configs.copy() + tmp.update(configs) + return tmp + + def _get_package_tgz_name(self, package_id): + return '{0}.tgz'.format(package_id) + + def _get_yarn_home_dir(self, package_id): + return os.path.abspath(package_id) + + def _get_yarn_conf_dir(self, package_id): + return os.path.join(self._get_yarn_home_dir(package_id), 'config') + + def _get_env_vars(self, package_id): + env = os.environ.copy() + env['YARN_CONF_DIR'] = self._get_yarn_conf_dir(package_id) + env['HADOOP_CONF_DIR'] = env['YARN_CONF_DIR'] + logger.debug('Built environment: {0}'.format(env)) + return env + http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/templates.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/templates.py b/samza-test/src/main/python/templates.py new file mode 100644 index 0000000..a85dc36 --- /dev/null +++ b/samza-test/src/main/python/templates.py @@ -0,0 +1,17 @@ +from contextlib import nested +from jinja2 import Template + +def render_config(template_location, rendered_location, properties): + """ + A method for rendering simple key/value configs into a template. Uses Jinja2 + style templating. + + param: template_location -- File path of the input Jinja2 template. + param: rendered_location -- File path where rendered output should be saved. + param: properties -- A dictionary of key/value pairs to be passed to the + template with the accessor name 'properties'. + """ + with nested(open(template_location, 'r'), open(rendered_location, 'w')) as (input, output): + template = Template(input.read()) + rendered = template.render(properties=properties) + output.write(rendered) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/templates/yarn-site.xml ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/templates/yarn-site.xml b/samza-test/src/main/python/templates/yarn-site.xml new file mode 100644 index 0000000..d98436e --- /dev/null +++ b/samza-test/src/main/python/templates/yarn-site.xml @@ -0,0 +1,27 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<configuration> + {% for property, value in properties.iteritems() %} + <property> + <name>{{ property }}</name> + <value>{{ value }}</value> + </property> + {% endfor %} +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/tests.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/tests.py b/samza-test/src/main/python/tests.py new file mode 100644 index 0000000..dae414e --- /dev/null +++ b/samza-test/src/main/python/tests.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# 'License'); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os + +dir = os.path.dirname(os.path.abspath(__file__)) + +test = { + 'deployment_code': os.path.join(dir, 'deployment.py'), + 'perf_code': os.path.join(dir, 'perf.py'), + 'configs_directory': os.path.join(dir, 'configs'), + 'test_code': [ + os.path.join(dir, 'tests', 'smoke_tests.py') + ], +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/python/tests/smoke_tests.py ---------------------------------------------------------------------- diff --git a/samza-test/src/main/python/tests/smoke_tests.py b/samza-test/src/main/python/tests/smoke_tests.py new file mode 100644 index 0000000..7aec4e0 --- /dev/null +++ b/samza-test/src/main/python/tests/smoke_tests.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import time +import logging +import socket +import errno +from kafka import KafkaClient, SimpleProducer, SimpleConsumer +import zopkio.runtime as runtime + +logger = logging.getLogger(__name__) + +CWD = os.path.dirname(os.path.abspath(__file__)) +HOME_DIR = os.path.join(CWD, os.pardir) +DATA_DIR = os.path.join(HOME_DIR, 'data') +TEST_TOPIC = 'samza-test-topic' +TEST_OUTPUT_TOPIC = 'samza-test-topic-output' +NUM_MESSAGES = 50 + +def test_samza_job(): + """ + Sends 50 messages (1 .. 50) to samza-test-topic. + """ + logger.info('Running test_samza_job') + kafka = _get_kafka_client() + kafka.ensure_topic_exists(TEST_TOPIC) + producer = SimpleProducer(kafka, + async=False, + req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, + ack_timeout=30000) + for i in range(1, NUM_MESSAGES + 1): + producer.send_messages(TEST_TOPIC, str(i)) + kafka.close() + +def validate_samza_job(): + """ + Validates that negate-number negated all messages, and sent the output to + samza-test-topic-output. + """ + logger.info('Running validate_samza_job') + kafka = _get_kafka_client() + kafka.ensure_topic_exists(TEST_OUTPUT_TOPIC) + consumer = SimpleConsumer(kafka, 'samza-test-group', TEST_OUTPUT_TOPIC) + messages = consumer.get_messages(count=NUM_MESSAGES, block=True, timeout=60) + message_count = len(messages) + assert NUM_MESSAGES == message_count, 'Expected {0} lines, but found {1}'.format(NUM_MESSAGES, message_count) + for message in map(lambda m: m.message.value, messages): + assert int(message) < 0 , 'Expected negative integer but received {0}'.format(message) + kafka.close() + +def _get_kafka_client(num_retries=20, retry_sleep=1): + """ + Returns a KafkaClient based off of the kafka_hosts and kafka_port configs set + in the active runtime. + """ + kafka_hosts = runtime.get_active_config('kafka_hosts').values() + kafka_port = runtime.get_active_config('kafka_port') + assert len(kafka_hosts) > 0, 'Missing required configuration: kafka_hosts' + connect_string = ','.join(map(lambda h: h + ':{0},'.format(kafka_port), kafka_hosts)).rstrip(',') + # wait for at least one broker to come up + if not _wait_for_server(kafka_hosts[0], kafka_port, 30): + raise Exception('Unable to connect to Kafka broker: {0}:{1}'.format(kafka_hosts[0], kafka_port)) + return KafkaClient(connect_string) + +def _wait_for_server(host, port, timeout=5, retries=12): + """ + Keep trying to connect to a host port until the retry count has been reached. + """ + s = socket.socket() + + for i in range(retries): + try: + s.settimeout(timeout) + s.connect((host, port)) + except socket.timeout, err: + # Exception occurs if timeout is set. Wait and retry. + pass + except socket.error, err: + # Exception occurs if timeout > underlying network timeout. Wait and retry. + if type(err.args) != tuple or err[0] != errno.ETIMEDOUT: + raise + else: + s.close() + return True + return False + http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/resources/hello-stateful-world.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/hello-stateful-world.samsa b/samza-test/src/main/resources/hello-stateful-world.samsa deleted file mode 100644 index 745f881..0000000 --- a/samza-test/src/main/resources/hello-stateful-world.samsa +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.factory.class=samza.job.local.ThreadJobFactory -job.name=hello-stateful-world - -# Task -task.class=samza.test.integration.SimpleStatefulTask -task.inputs=kafka.input - -# Stores -stores.mystore.factory=samza.storage.kv.KeyValueStorageEngineFactory -stores.mystore.key.serde=string -stores.mystore.msg.serde=string -stores.mystore.changelog=kafka.mystore http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/resources/join/checker.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/join/checker.samsa b/samza-test/src/main/resources/join/checker.samsa deleted file mode 100644 index 6a6b9cd..0000000 --- a/samza-test/src/main/resources/join/checker.samsa +++ /dev/null @@ -1,36 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.name=checker - -systems.kafka.partitioner.class=samza.test.integration.join.EpochPartitioner - -# Task -task.class=samza.test.integration.join.Checker -task.inputs=kafka.completed-keys - -stores.checker-state.factory=samza.storage.kv.KeyValueStorageEngineFactory -stores.checker-state.key.serde=string -stores.checker-state.msg.serde=string -stores.checker-state.changelog=kafka.checker-state - -task.window.ms=300000 - -num.partitions=4 -expected.keys=100000 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/resources/join/emitter.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/join/emitter.samsa b/samza-test/src/main/resources/join/emitter.samsa deleted file mode 100644 index 5e94322..0000000 --- a/samza-test/src/main/resources/join/emitter.samsa +++ /dev/null @@ -1,33 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.name=emitter - -# Task -task.class=samza.test.integration.join.Emitter -task.inputs=kafka.epoch - -stores.emitter-state.factory=samza.storage.kv.KeyValueStorageEngineFactory -stores.emitter-state.key.serde=string -stores.emitter-state.msg.serde=string -stores.emitter-state.changelog=kafka.emitter-state - -task.window.ms=0 - -count=100000 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/resources/join/joiner.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/join/joiner.samsa b/samza-test/src/main/resources/join/joiner.samsa deleted file mode 100644 index 4ecee2b..0000000 --- a/samza-test/src/main/resources/join/joiner.samsa +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.name=joiner - -# Task -task.class=samza.test.integration.join.Joiner -task.inputs=kafka.emitted - -stores.joiner-state.factory=samza.storage.kv.KeyValueStorageEngineFactory -stores.joiner-state.key.serde=string -stores.joiner-state.msg.serde=string -stores.joiner-state.changelog=kafka.checker-state - -num.partitions=4 http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/resources/join/watcher.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/join/watcher.samsa b/samza-test/src/main/resources/join/watcher.samsa deleted file mode 100644 index 025f055..0000000 --- a/samza-test/src/main/resources/join/watcher.samsa +++ /dev/null @@ -1,31 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.name=watcher - -# Task -task.class=samza.test.integration.join.Joiner -task.inputs=kafka.epoch - -task.window.ms=300000 - -max.time.between.epochs.ms=600000 -mail.smtp.host=TODO -mail.to=d...@samza.incubator.apache.org -mail.from=gre...@incubator.apache.org http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/resources/perf/counter.samsa ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/perf/counter.samsa b/samza-test/src/main/resources/perf/counter.samsa deleted file mode 100644 index 6e80a22..0000000 --- a/samza-test/src/main/resources/perf/counter.samsa +++ /dev/null @@ -1,30 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# Job -job.name=counter-task - -# Task -task.class=samza.test.integration.StatePerfTestTask -task.inputs=kafka.input - -# Stores -stores.mystore.factory=samza.storage.kv.KeyValueStorageEngineFactory -stores.mystore.key.serde=string -stores.mystore.msg.serde=string -stores.mystore.changelog=kafka.mystore http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/6bbbaa59/samza-test/src/main/resources/perf/kv-perf.properties ---------------------------------------------------------------------- diff --git a/samza-test/src/main/resources/perf/kv-perf.properties b/samza-test/src/main/resources/perf/kv-perf.properties deleted file mode 100644 index dcc223f..0000000 --- a/samza-test/src/main/resources/perf/kv-perf.properties +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -stores.test.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory -stores.test.compaction.delete.threshold=1000 -test.partition.count=4 -test.num.loops=1000