Repository: samza-hello-samza Updated Branches: refs/heads/latest 3d0e919e6 -> 591aaebc4
SAMZA-1237: Add support for standalone mode to wikipedia application Author: Bharath Kumarasubramanian <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #13 from bharathkk/latest Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/591aaebc Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/591aaebc Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/591aaebc Branch: refs/heads/latest Commit: 591aaebc4ffca24a193b068fdc526e20cc57d06b Parents: 3d0e919 Author: Bharath Kumarasubramanian <[email protected]> Authored: Fri May 12 07:51:59 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Fri May 12 07:51:59 2017 -0700 ---------------------------------------------------------------------- .gitignore | 1 + bin/grid | 27 ++++++++- bin/run-wikipedia-standalone-application.sh | 30 ++++++++++ src/main/assembly/src.xml | 52 ++++------------- ...ikipedia-application-local-runner.properties | 60 ++++++++++++++++++++ .../application/WikipediaApplication.java | 2 +- .../WikipediaZkLocalApplication.java | 47 +++++++++++++++ 7 files changed, 177 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 849ce6a..f31af00 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ deploy *.swp build/ .gradle/ +state http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/bin/grid ---------------------------------------------------------------------- diff --git a/bin/grid b/bin/grid index 7d2112b..5f715b5 100755 --- a/bin/grid +++ b/bin/grid @@ -39,7 +39,7 @@ DOWNLOAD_KAFKA=http://www.us.apache.org/dist/kafka/0.10.1.1/kafka_2.11-0.10.1.1. DOWNLOAD_YARN=https://archive.apache.org/dist/hadoop/common/hadoop-2.6.1/hadoop-2.6.1.tar.gz DOWNLOAD_ZOOKEEPER=http://archive.apache.org/dist/zookeeper/zookeeper-3.4.3/zookeeper-3.4.3.tar.gz -SERVICE_WAIT_TIMEOUT_SEC=10 +SERVICE_WAIT_TIMEOUT_SEC=20 ZOOKEEPER_PORT=2181 RESOURCEMANAGER_PORT=8032 NODEMANAGER_PORT=8042 @@ -55,6 +55,16 @@ bootstrap() { exit 0 } +standalone() { + echo "Setting up the ystem..." + stop_all + rm -rf "$DEPLOY_ROOT_DIR" + mkdir "$DEPLOY_ROOT_DIR" + install_all_without_yarn + start_all_without_yarn + exit 0 +} + install_all() { $DIR/grid install samza $DIR/grid install zookeeper @@ -62,6 +72,12 @@ install_all() { $DIR/grid install kafka } +install_all_without_yarn() { + $DIR/grid install samza + $DIR/grid install zookeeper + $DIR/grid install kafka +} + install_samza() { mkdir -p "$DEPLOY_ROOT_DIR" if [ -d "$DOWNLOAD_CACHE_DIR/samza/.git" ]; then @@ -128,6 +144,11 @@ start_all() { $DIR/grid start kafka } +start_all_without_yarn() { + $DIR/grid start zookeeper + $DIR/grid start kafka +} + start_zookeeper() { if [ -f $DEPLOY_ROOT_DIR/$SYSTEM/bin/zkServer.sh ]; then cd $DEPLOY_ROOT_DIR/$SYSTEM @@ -218,6 +239,9 @@ stop_kafka() { if [ "$COMMAND" == "bootstrap" ] && test -z "$SYSTEM"; then bootstrap exit 0 +elif [ "$COMMAND" == "standalone" ] && test -z "$SYSTEM"; then + standalone + exit 0 elif (test -z "$COMMAND" && test -z "$SYSTEM") \ || ( [ "$COMMAND" == "help" ] || test -z "$COMMAND" || test -z "$SYSTEM"); then echo @@ -225,6 +249,7 @@ elif (test -z "$COMMAND" && test -z "$SYSTEM") \ echo echo " $ grid" echo " $ grid bootstrap" + echo " $ grid standalone" echo " $ grid install [yarn|kafka|zookeeper|samza|all]" echo " $ grid start [yarn|kafka|zookeeper|all]" echo " $ grid stop [yarn|kafka|zookeeper|all]" http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/bin/run-wikipedia-standalone-application.sh ---------------------------------------------------------------------- diff --git a/bin/run-wikipedia-standalone-application.sh b/bin/run-wikipedia-standalone-application.sh new file mode 100755 index 0000000..f750e2b --- /dev/null +++ b/bin/run-wikipedia-standalone-application.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +home_dir=`pwd` +base_dir=$(dirname $0)/.. +cd $base_dir +base_dir=`pwd` +cd $home_dir + +export EXECUTION_PLAN_DIR="$base_dir/plan" +mkdir -p $EXECUTION_PLAN_DIR + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +exec $(dirname $0)/run-class.sh samza.examples.wikipedia.application.WikipediaZkLocalApplication "$@" http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index ca90ebf..69cbbbe 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -28,53 +28,25 @@ <include>NOTICE*</include> </includes> </fileSet> + <fileSet> + <directory>${basedir}/src/main/config</directory> + <includes> + <include>*.properties</include> + </includes> + <outputDirectory>config</outputDirectory> + <!-- filtered=true, so we do variable expansion so the yarn package path + always points to the correct spot on any machine --> + <filtered>true</filtered> + </fileSet> </fileSets> <files> <file> <source>${basedir}/src/main/resources/log4j.xml</source> <outputDirectory>lib</outputDirectory> </file> - <!-- filtered=true, so we do variable expansion so the yarn package path - always points to the correct spot on any machine --> - <file> - <source>${basedir}/src/main/config/wikipedia-feed.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - <file> - <source>${basedir}/src/main/config/wikipedia-parser.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - <file> - <source>${basedir}/src/main/config/wikipedia-stats.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> <file> - <source>${basedir}/src/main/config/wikipedia-application.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - <file> - <source>${basedir}/src/main/config/tumbling-pageview-counter.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - <file> - <source>${basedir}/src/main/config/pageview-sessionizer.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - <file> - <source>${basedir}/src/main/config/pageview-filter.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> - </file> - <file> - <source>${basedir}/src/main/config/pageview-adclick-joiner.properties</source> - <outputDirectory>config</outputDirectory> - <filtered>true</filtered> + <source>${basedir}/bin/run-local-app.sh</source> + <outputDirectory>bin</outputDirectory> </file> </files> <dependencySets> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/config/wikipedia-application-local-runner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/wikipedia-application-local-runner.properties b/src/main/config/wikipedia-application-local-runner.properties new file mode 100644 index 0000000..965a131 --- /dev/null +++ b/src/main/config/wikipedia-application-local-runner.properties @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.name=wikipedia-application +job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory +job.default.system=kafka +coordinator.zk.connect=localhost:2181 + +# Task/Application +app.processor-id-generator.class=org.apache.samza.runtime.UUIDGenerator +task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactgory +serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory +serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory + +# Wikipedia System +systems.wikipedia.samza.factory=samza.examples.wikipedia.system.WikipediaSystemFactory +systems.wikipedia.host=irc.wikimedia.org +systems.wikipedia.port=6667 + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 +systems.kafka.default.stream.replication.factor=1 +systems.kafka.default.stream.samza.msg.serde=json + +# Streams +streams.en-wikipedia.samza.system=wikipedia +streams.en-wikipedia.samza.physical.name=#en.wikipedia + +streams.en-wiktionary.samza.system=wikipedia +streams.en-wiktionary.samza.physical.name=#en.wiktionary + +streams.en-wikinews.samza.system=wikipedia +streams.en-wikinews.samza.physical.name=#en.wikinews + +# Key-value storage +stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory +stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog +stores.wikipedia-stats.key.serde=string +stores.wikipedia-stats.msg.serde=integer + http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java index b0779db..3432e3d 100644 --- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java @@ -88,7 +88,7 @@ public class WikipediaApplication implements StreamApplication { OutputStream<Void, Map<String, Integer>, Map<String, Integer>> wikipediaStats = graph.getOutputStream(STATS_STREAM_ID, m -> null, m -> m); // Merge inputs - MessageStream<WikipediaFeedEvent> allWikipediaEvents = wikipediaEvents.merge(ImmutableList.of(wiktionaryEvents, wikiNewsEvents)); + MessageStream<WikipediaFeedEvent> allWikipediaEvents = MessageStream.mergeAll(ImmutableList.of(wikipediaEvents, wiktionaryEvents, wikiNewsEvents)); // Parse, update stats, prepare output, and send allWikipediaEvents.map(WikipediaParser::parseEvent) http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/591aaebc/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java new file mode 100644 index 0000000..8e978bc --- /dev/null +++ b/src/main/java/samza/examples/wikipedia/application/WikipediaZkLocalApplication.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.wikipedia.application; + +import joptsimple.OptionSet; +import org.apache.samza.config.Config; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.util.CommandLine; +import org.apache.samza.util.Util; + + +/** + * An entry point for {@link WikipediaApplication} that runs in stand alone mode using zookeeper. + * It waits for the job to finish; The job can also be ended by killing this process. + */ +public class WikipediaZkLocalApplication { + + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + OptionSet options = cmdLine.parser().parse(args); + Config orgConfig = cmdLine.loadConfig(options); + Config config = Util.rewriteConfig(orgConfig); + + LocalApplicationRunner runner = new LocalApplicationRunner(config); + WikipediaApplication app = new WikipediaApplication(); + + runner.run(app); + runner.waitForFinish(); + } +}
