Repository: samza-hello-samza Updated Branches: refs/heads/master 5fa742ac4 -> 90bbe75f2
Adding hello-samza example for kinesis Author: Aditya Toomula <[email protected]> Reviewers: Jagadish<[email protected]> Closes #30 from atoomula/kinesis Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/90bbe75f Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/90bbe75f Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/90bbe75f Branch: refs/heads/master Commit: 90bbe75f2d12ddf2b3d538c86fb3030f2a06f589 Parents: 5fa742a Author: Aditya Toomula <[email protected]> Authored: Wed Dec 13 14:58:02 2017 -0800 Committer: Jagadish <[email protected]> Committed: Wed Dec 13 14:58:02 2017 -0800 ---------------------------------------------------------------------- build.gradle | 1 + pom.xml | 5 ++ src/main/config/kinesis-hello-samza.properties | 54 ++++++++++++++++++++ .../examples/kinesis/KinesisHelloSamza.java | 42 +++++++++++++++ 4 files changed, 102 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/90bbe75f/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 9d1f543..21793c4 100644 --- a/build.gradle +++ b/build.gradle @@ -48,6 +48,7 @@ dependencies { compile(group: 'org.schwering', name: 'irclib', version: '1.10') compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION") compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION") + compile(group: 'org.apache.samza', name: 'samza-aws', version: "$SAMZA_VERSION") explode (group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION") http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/90bbe75f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index da7ec90..a933038 100644 --- a/pom.xml +++ b/pom.xml @@ -79,6 +79,11 @@ under the License. <version>${samza.version}</version> </dependency> <dependency> + <groupId>org.apache.samza</groupId> + <artifactId>samza-aws</artifactId> + <version>${samza.version}</version> + </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version> http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/90bbe75f/src/main/config/kinesis-hello-samza.properties ---------------------------------------------------------------------- diff --git a/src/main/config/kinesis-hello-samza.properties b/src/main/config/kinesis-hello-samza.properties new file mode 100644 index 0000000..17203df --- /dev/null +++ b/src/main/config/kinesis-hello-samza.properties @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Job +job.factory.class=org.apache.samza.job.yarn.YarnJobFactory +job.name=kinesis-hello-samza + +job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory + +# YARN +yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz +yarn.container.count=2 + +# Task +task.class=samza.examples.kinesis.KinesisHelloSamza +# Please replace the below input stream with the stream you plan to consume from. +task.inputs=kinesis.kinesis-samza-sample-stream + +# Serializers +serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory + +# Kinesis System +systems.kinesis.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory +# Please replace the below with the region of your Kinesis data stream. +systems.kinesis.streams.kinesis-samza-sample-stream.aws.region=us-west-1 +# Access key below is a dummy key for instructional purposes. Please replace with your own key. +systems.kinesis.streams.kinesis-samza-sample-stream.aws.accessKey=AKIAIHSMRK3Q72O8TEXQ +# Secret key below is a dummy key for instructional purposes. Please replace with your own key. +sensitive.systems.kinesis.streams.kinesis-samza-sample-stream.aws.secretKey=9GuEqdY+gNXXGrOQyev8XKziY+sRB1ht91jloEyP +systems.kinesis.streams.kinesis-samza-sample-stream.aws.kcl.TableName=kinesis-hello-samza + +# Kafka System +systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory +systems.kafka.samza.msg.serde=json +systems.kafka.consumer.zookeeper.connect=localhost:2181/ +systems.kafka.producer.bootstrap.servers=localhost:9092 + +# Job Coordinator +job.coordinator.system=kafka +job.coordinator.replication.factor=1 http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/90bbe75f/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java ---------------------------------------------------------------------- diff --git a/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java new file mode 100644 index 0000000..4fadf78 --- /dev/null +++ b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package samza.examples.kinesis; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.kinesis.consumer.KinesisIncomingMessageEnvelope; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamTask; +import org.apache.samza.task.TaskCoordinator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A sample task which consumes messages from kinesis stream and logs the message content. + */ +public class KinesisHelloSamza implements StreamTask { + private static final Logger LOG = LoggerFactory.getLogger(KinesisHelloSamza.class); + + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { + KinesisIncomingMessageEnvelope kEnvelope = (KinesisIncomingMessageEnvelope) envelope; + long lagMs = System.currentTimeMillis() - kEnvelope.getApproximateArrivalTimestamp().getTime(); + LOG.info(String.format("Kinesis message key: %s Lag: %d ms", envelope.getKey(), lagMs)); + } +} \ No newline at end of file
