Repository: samza-hello-samza
Updated Branches:
  refs/heads/master 5fa742ac4 -> 90bbe75f2


Adding hello-samza example for kinesis

Author: Aditya Toomula <[email protected]>

Reviewers: Jagadish<[email protected]>

Closes #30 from atoomula/kinesis


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/90bbe75f
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/90bbe75f
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/90bbe75f

Branch: refs/heads/master
Commit: 90bbe75f2d12ddf2b3d538c86fb3030f2a06f589
Parents: 5fa742a
Author: Aditya Toomula <[email protected]>
Authored: Wed Dec 13 14:58:02 2017 -0800
Committer: Jagadish <[email protected]>
Committed: Wed Dec 13 14:58:02 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 pom.xml                                         |  5 ++
 src/main/config/kinesis-hello-samza.properties  | 54 ++++++++++++++++++++
 .../examples/kinesis/KinesisHelloSamza.java     | 42 +++++++++++++++
 4 files changed, 102 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/90bbe75f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 9d1f543..21793c4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -48,6 +48,7 @@ dependencies {
     compile(group: 'org.schwering', name: 'irclib', version: '1.10')
     compile(group: 'org.apache.samza', name: 'samza-api', version: 
"$SAMZA_VERSION")
     compile(group: 'org.apache.samza', name: 'samza-kv_2.11', version: 
"$SAMZA_VERSION")
+    compile(group: 'org.apache.samza', name: 'samza-aws', version: 
"$SAMZA_VERSION")
 
     explode (group: 'org.apache.samza', name: 'samza-shell',  ext: 'tgz', 
classifier: 'dist', version: "$SAMZA_VERSION")
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/90bbe75f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index da7ec90..a933038 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,11 @@ under the License.
       <version>${samza.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.samza</groupId>
+      <artifactId>samza-aws</artifactId>
+      <version>${samza.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.11</artifactId>
       <version>0.10.1.1</version>

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/90bbe75f/src/main/config/kinesis-hello-samza.properties
----------------------------------------------------------------------
diff --git a/src/main/config/kinesis-hello-samza.properties 
b/src/main/config/kinesis-hello-samza.properties
new file mode 100644
index 0000000..17203df
--- /dev/null
+++ b/src/main/config/kinesis-hello-samza.properties
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Job
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+job.name=kinesis-hello-samza
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory
+
+# YARN
+yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
+yarn.container.count=2
+
+# Task
+task.class=samza.examples.kinesis.KinesisHelloSamza
+# Please replace the below input stream with the stream you plan to consume 
from.
+task.inputs=kinesis.kinesis-samza-sample-stream
+
+# Serializers
+serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
+
+# Kinesis System
+systems.kinesis.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory
+# Please replace the below with the region of your Kinesis data stream.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.region=us-west-1
+# Access key below is a dummy key for instructional purposes. Please replace 
with your own key.
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.accessKey=AKIAIHSMRK3Q72O8TEXQ
+# Secret key below is a dummy key for instructional purposes. Please replace 
with your own key.
+sensitive.systems.kinesis.streams.kinesis-samza-sample-stream.aws.secretKey=9GuEqdY+gNXXGrOQyev8XKziY+sRB1ht91jloEyP
+systems.kinesis.streams.kinesis-samza-sample-stream.aws.kcl.TableName=kinesis-hello-samza
+
+# Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.samza.msg.serde=json
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+# Job Coordinator
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/90bbe75f/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java 
b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
new file mode 100644
index 0000000..4fadf78
--- /dev/null
+++ b/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.kinesis;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.kinesis.consumer.KinesisIncomingMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A sample task which consumes messages from kinesis stream and logs the 
message content.
+ */
+public class KinesisHelloSamza implements StreamTask {
+  private static final Logger LOG = 
LoggerFactory.getLogger(KinesisHelloSamza.class);
+
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
+    KinesisIncomingMessageEnvelope kEnvelope = 
(KinesisIncomingMessageEnvelope) envelope;
+    long lagMs = System.currentTimeMillis() - 
kEnvelope.getApproximateArrivalTimestamp().getTime();
+    LOG.info(String.format("Kinesis message key: %s Lag: %d ms", 
envelope.getKey(), lagMs));
+  }
+}
\ No newline at end of file

Reply via email to