Repository: samza-hello-samza Updated Branches: refs/heads/latest e7811dbf7 -> f48c7f9f3
SAMZA-1524: Azure checkpoint and eventhubs fluent api standalone example Tutorial and docs coming soon Author: Daniel Chen <[email protected]> Reviewers: Jagadish<[email protected]> Closes #29 from dxichen/azure-examples Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/f48c7f9f Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/f48c7f9f Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/f48c7f9f Branch: refs/heads/latest Commit: f48c7f9f3419e69821b9685dcffc3fd89803f8fa Parents: e7811db Author: Daniel Chen <[email protected]> Authored: Mon Dec 11 17:56:18 2017 -0800 Committer: Jagadish <[email protected]> Committed: Mon Dec 11 17:56:18 2017 -0800 ---------------------------------------------------------------------- bin/run-azure-application.sh | 30 ++++++++++ pom.xml | 5 ++ src/main/assembly/src.xml | 4 ++ .../azure-application-local-runner.properties | 49 ++++++++++++++++ .../samza/examples/azure/AzureApplication.java | 61 ++++++++++++++++++++ .../examples/azure/AzureZKLocalApplication.java | 42 ++++++++++++++ 6 files changed, 191 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/bin/run-azure-application.sh ---------------------------------------------------------------------- diff --git a/bin/run-azure-application.sh b/bin/run-azure-application.sh new file mode 100755 index 0000000..8cd2463 --- /dev/null +++ b/bin/run-azure-application.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +home_dir=`pwd` +base_dir=$(dirname $0)/.. +cd $base_dir +base_dir=`pwd` +cd $home_dir + +export EXECUTION_PLAN_DIR="$base_dir/plan" +mkdir -p $EXECUTION_PLAN_DIR + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +exec $(dirname $0)/run-class.sh samza.examples.azure.AzureZKLocalApplication --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/azure-application-local-runner.properties http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ada4c2b..5b2eb55 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,11 @@ under the License. </dependency> <dependency> <groupId>org.apache.samza</groupId> + <artifactId>samza-azure</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> + <groupId>org.apache.samza</groupId> <artifactId>samza-core_2.11</artifactId> <version>${samza.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/src.xml b/src/main/assembly/src.xml index c04ace0..8f3694e 100644 --- a/src/main/assembly/src.xml +++ b/src/main/assembly/src.xml @@ -48,6 +48,10 @@ <source>${basedir}/bin/run-wikipedia-zk-application.sh</source> <outputDirectory>bin</outputDirectory> </file> + <file> + <source>${basedir}/bin/run-azure-application.sh</source> + <outputDirectory>bin</outputDirectory> + </file> </files> <dependencySets> <dependencySet> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/config/azure-application-local-runner.properties ---------------------------------------------------------------------- diff --git a/src/main/config/azure-application-local-runner.properties b/src/main/config/azure-application-local-runner.properties new file mode 100644 index 0000000..e440fd8 --- /dev/null +++ b/src/main/config/azure-application-local-runner.properties @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.name=azure-application-local-runner +job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory +job.default.system=eventhubs +job.coordinator.zk.connect=localhost:2181 + +# Azure EventHubs System +systems.eventhubs.samza.factory=org.apache.samza.system.eventhub.EventHubSystemFactory +systems.eventhubs.stream.list=output-stream,input-stream + +# Add your EventHubs input stream credentials here +systems.eventhubs.streams.input-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE +systems.eventhubs.streams.input-stream.eventhubs.entitypath=YOUR-ENTITY-NAME +systems.eventhubs.streams.input-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME +systems.eventhubs.streams.input-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN + +# Add your EventHubs output stream credentials here +systems.eventhubs.streams.output-stream.eventhubs.namespace=YOUR-STREAM-NAMESPACE +systems.eventhubs.streams.output-stream.eventhubs.entitypath=YOUR-ENTITY-NAME +systems.eventhubs.streams.output-stream.eventhubs.sas.keyname=YOUR-SAS-KEY-NAME +systems.eventhubs.streams.output-stream.eventhubs.sas.token=YOUR-SAS-KEY-TOKEN + +# Azure Table Checkpoint Manager +task.checkpoint.factory=org.apache.samza.checkpoint.azure.AzureCheckpointManagerFactory +azure.storage.connect=YOUR-STORAGE-ACCOUNT-CONNECTION-STRING + +# Task/Application +task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory + +# Streams +streams.input-stream.samza.system=eventhubs +streams.output-stream.samza.system=eventhubs http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/java/samza/examples/azure/AzureApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java new file mode 100644 index 0000000..9f565fe --- /dev/null +++ b/src/main/java/samza/examples/azure/AzureApplication.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.azure; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.serializers.ByteSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; + +public class AzureApplication implements StreamApplication { + + // Inputs + private static final String INPUT_STREAM_ID = "input-stream"; + + // Outputs + private static final String OUTPUT_STREAM_ID = "output-stream"; + + @Override + public void init(StreamGraph graph, Config config) { + + // Input + MessageStream<KV<String, byte[]>> eventhubInput = graph.getInputStream(INPUT_STREAM_ID); + + // Output + OutputStream<KV<String, byte[]>> eventhubOutput = + graph.getOutputStream(OUTPUT_STREAM_ID, KVSerde.of(new StringSerde(), new ByteSerde())); + + // Send + eventhubInput + .filter((message) -> message.getKey() != null) + .map((message) -> { + System.out.println("Sending: "); + System.out.println("Received Key: " + message.getKey()); + System.out.println("Received Message: " + new String(message.getValue())); + return message; + }) + .sendTo(eventhubOutput); + } +} http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/f48c7f9f/src/main/java/samza/examples/azure/AzureZKLocalApplication.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/azure/AzureZKLocalApplication.java b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java new file mode 100644 index 0000000..3d4f8b0 --- /dev/null +++ b/src/main/java/samza/examples/azure/AzureZKLocalApplication.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.azure; + +import joptsimple.OptionSet; +import org.apache.samza.config.Config; +import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.util.CommandLine; +import samza.examples.azure.AzureApplication; + +public class AzureZKLocalApplication { + + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + OptionSet options = cmdLine.parser().parse(args); + Config config = cmdLine.loadConfig(options); + + LocalApplicationRunner runner = new LocalApplicationRunner(config); + AzureApplication app = new AzureApplication(); + + runner.run(app); + runner.waitForFinish(); + } + +}
