Initial structure of Kafka components.
Initital code for Kafka Consumer

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/4453f1f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/4453f1f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/4453f1f1

Branch: refs/heads/master
Commit: 4453f1f10df1ebe93dff7ae831162714ad37a9a3
Parents: 26c2191
Author: pwawrzyniak <[email protected]>
Authored: Tue Mar 14 17:43:25 2017 +0100
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 samoa-api/pom.xml                               | 208 ++++++++++---------
 .../samoa/streams/kafka/KafkaDeserializer.java  |  30 +++
 .../kafka/KafkaDestinationProcessor.java        |  42 ++++
 .../streams/kafka/KafkaEntranceProcessor.java   |  65 ++++++
 .../samoa/streams/kafka/KafkaSerializer.java    |  31 +++
 .../apache/samoa/streams/kafka/KafkaUtils.java  |  71 +++++++
 6 files changed, 346 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml
index 9f69e20..4621b93 100644
--- a/samoa-api/pom.xml
+++ b/samoa-api/pom.xml
@@ -11,119 +11,125 @@
   
        http://www.apache.org/licenses/LICENSE-2.0
   
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  #L%
-  -->
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+#L%
+-->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
 
-  <name>samoa-api</name>
-  <description>API and algorithms for SAMOA</description>
+    <name>samoa-api</name>
+    <description>API and algorithms for SAMOA</description>
 
-  <artifactId>samoa-api</artifactId>
-  <parent>
-    <groupId>org.apache.samoa</groupId>
-    <artifactId>samoa</artifactId>
-    <version>0.5.0-incubating-SNAPSHOT</version>
-  </parent>
+    <artifactId>samoa-api</artifactId>
+    <parent>
+        <groupId>org.apache.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+    </parent>
 
-  <dependencies>
-    <dependency>
-      <groupId>com.yammer.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-      <version>${metrics-core.version}</version>
-    </dependency>
+    <dependencies>
+        <dependency>
+            <groupId>com.yammer.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${metrics-core.version}</version>
+        </dependency>
 
-    <dependency>
-      <groupId>net.jcip</groupId>
-      <artifactId>jcip-annotations</artifactId>
-      <version>${jcip-annotations.version}</version>
-    </dependency>
+        <dependency>
+            <groupId>net.jcip</groupId>
+            <artifactId>jcip-annotations</artifactId>
+            <version>${jcip-annotations.version}</version>
+        </dependency>
 
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>${commons-lang3.version}</version>
-    </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
 
-    <dependency>
-      <groupId>com.github.javacliparser</groupId>
-      <artifactId>javacliparser</artifactId>
-      <version>${javacliparser.version}</version>
-    </dependency>
+        <dependency>
+            <groupId>com.github.javacliparser</groupId>
+            <artifactId>javacliparser</artifactId>
+            <version>${javacliparser.version}</version>
+        </dependency>
 
-    <dependency>
-      <groupId>org.apache.samoa</groupId>
-      <artifactId>samoa-instances</artifactId>
-      <version>${project.version}</version>
-    </dependency>
+        <dependency>
+            <groupId>org.apache.samoa</groupId>
+            <artifactId>samoa-instances</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>${guava.version}</version>
-    </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
 
-    <dependency>
-      <groupId>com.esotericsoftware.kryo</groupId>
-      <artifactId>kryo</artifactId>
-      <version>${kryo.version}</version>
-    </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>${kryo.version}</version>
+        </dependency>
 
-    <dependency>
-      <groupId>com.dreizak</groupId>
-      <artifactId>miniball</artifactId>
-      <version>${miniball.version}</version>
-    </dependency>
+        <dependency>
+            <groupId>com.dreizak</groupId>
+            <artifactId>miniball</artifactId>
+            <version>${miniball.version}</version>
+        </dependency>
 
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <version>${hadoop.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minicluster</artifactId>
-      <version>${hadoop.version}</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.2.0</version>
+        </dependency>
+    </dependencies>
 
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <version>${maven-dependency-plugin.version}</version>
-        <executions>
-          <execution>
-            <id>copy-dependencies</id>
-            <phase>package</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${project.build.directory}/lib</outputDirectory>
-              <overWriteReleases>false</overWriteReleases>
-              <overWriteSnapshots>false</overWriteSnapshots>
-              <overWriteIfNewer>true</overWriteIfNewer>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>${maven-dependency-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
new file mode 100644
index 0000000..2c7dae1
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @param <T> the class that would be deserialized
+ */
+public interface KafkaDeserializer<T extends ContentEvent> {
+    
+    // TODO: Consider key-value schema?
+    
+    T deserialize(byte[] message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
new file mode 100644
index 0000000..ed8f164
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaDestinationProcessor implements Processor {
+
+    @Override
+    public boolean process(ContentEvent event) {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public void onCreate(int id) {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public Processor newProcessor(Processor processor) {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
new file mode 100644
index 0000000..228e81b
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.util.Properties;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaEntranceProcessor implements EntranceProcessor {
+
+    transient private KafkaUtils kafkaUtils;
+
+    public KafkaEntranceProcessor(Properties props, String topic, int 
batchSize) {
+        kafkaUtils = new KafkaUtils(props, null, batchSize);
+    }
+
+    @Override
+    public void onCreate(int id) {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public boolean isFinished() {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public boolean hasNext() {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public ContentEvent nextEvent() {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public boolean process(ContentEvent event) {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public Processor newProcessor(Processor processor) {
+        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
new file mode 100644
index 0000000..29f04ca
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @param <T> the class that would be serialized
+ */
+public interface KafkaSerializer<T extends ContentEvent> {
+    
+    // TODO: Consider Key-Value schema?
+    
+    
+    byte[] serialize(T message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4453f1f1/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
new file mode 100644
index 0000000..c2fbaa8
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.util.Collection;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * Internal class responsible for Kafka Stream handling
+ *
+ * @author pwawrzyniak
+ */
+class KafkaUtils {
+
+    // Consumer class for internal use to retrieve messages from Kafka
+    private KafkaConsumer<String, byte[]> consumer;
+
+    private KafkaProducer<String, byte[]> producer;
+
+    // Properties of the consumer, as defined in Kafka documentation
+    private Properties consumerProperties;
+    private Properties producerProperties;
+
+    // Batch size for Kafka Consumer    
+    private int consumerTimeout;
+
+    public KafkaUtils(Properties consumerProperties, Properties 
producerProperties, int consumerTimeout) {
+        this.consumerProperties = consumerProperties;
+        this.producerProperties = producerProperties;
+        this.consumerTimeout = consumerTimeout;
+    }
+
+    public void initializeConsumer(Collection<String> topics) {
+        // lazy initialization
+        if (consumer == null) {
+            consumer = new KafkaConsumer<String, byte[]>(consumerProperties);
+        }
+        consumer.subscribe(topics);
+    }
+
+    public ConsumerRecords<String, byte[]> getMessages() throws Exception {
+
+        if (consumer != null) {
+            if (!consumer.subscription().isEmpty()) {
+                return consumer.poll(consumerTimeout);
+            } else {
+                // TODO: do it more elegant way
+                throw new Exception("Consumer subscribed to no topics!");
+            }
+        } else {
+            // TODO: do more elegant way
+            throw new Exception("Consumer not initialised");
+        }
+    }
+}

Reply via email to