Repository: incubator-samoa
Updated Branches:
  refs/heads/master 17733b5e6 -> 804eac8c0 (forced update)


SAMOA-65: Apache Kafka integration components for SAMOA
Fix #59


Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/804eac8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/804eac8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/804eac8c

Branch: refs/heads/master
Commit: 804eac8c066e2f177e4f9c4682452fc3735699ea
Parents: 26c2191
Author: pwawrzyniak <[email protected]>
Authored: Tue Mar 14 17:43:25 2017 +0100
Committer: Gianmarco De Francisci Morales <[email protected]>
Committed: Wed Jul 26 11:54:31 2017 +0300

----------------------------------------------------------------------
 .gitignore                                      |  31 +--
 pom.xml                                         |   2 +-
 samoa-api/pom.xml                               | 258 ++++++++++---------
 .../streams/kafka/KafkaConsumerThread.java      | 174 +++++++++++++
 .../samoa/streams/kafka/KafkaDeserializer.java  |  51 ++++
 .../kafka/KafkaDestinationProcessor.java        | 100 +++++++
 .../streams/kafka/KafkaEntranceProcessor.java   | 126 +++++++++
 .../samoa/streams/kafka/KafkaSerializer.java    |  52 ++++
 .../apache/samoa/streams/kafka/KafkaUtils.java  | 142 ++++++++++
 .../java/org/apache/samoa/tasks/KafkaTask.java  | 199 ++++++++++++++
 samoa-api/src/main/resources/kafka.avsc         | 106 ++++++++
 .../kafka/KafkaDestinationProcessorTest.java    | 175 +++++++++++++
 .../kafka/KafkaEntranceProcessorTest.java       | 185 +++++++++++++
 .../samoa/streams/kafka/KafkaUtilsTest.java     | 247 ++++++++++++++++++
 .../samoa/streams/kafka/OosTestSerializer.java  |  77 ++++++
 .../samoa/streams/kafka/TestUtilsForKafka.java  | 136 ++++++++++
 .../org/apache/samoa/utils/SystemsUtils.java    |   6 +-
 17 files changed, 1935 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 294c718..a834232 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,15 +1,16 @@
-#maven
-target/
-
-#eclipse
-.classpath
-.project
-.settings/
-
-#DS_Store
-.DS_Store
-
-#intellij
-.idea/
-*.iml
-*.iws
+#maven
+target/
+
+#eclipse
+.classpath
+.project
+.settings/
+
+#DS_Store
+.DS_Store
+
+#intellij
+.idea/
+*.iml
+*.iws
+/samoa-api/nbproject/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ecc713d..90d6a5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
         <jcip-annotations.version>1.0</jcip-annotations.version>
         <jmockit.version>1.13</jmockit.version>
         <junit.version>4.10</junit.version>
-        <kafka.version>0.8.1</kafka.version>
+        <kafka.version>0.10.2.0</kafka.version>
         <kryo.version>2.21</kryo.version>
         <metrics-core.version>2.2.0</metrics-core.version>
         <miniball.version>1.0.3</miniball.version>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml
index 9f69e20..e2e007a 100644
--- a/samoa-api/pom.xml
+++ b/samoa-api/pom.xml
@@ -8,122 +8,150 @@
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at
-  
+
        http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  #L%
-  -->
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+#L%
+-->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-
-  <name>samoa-api</name>
-  <description>API and algorithms for SAMOA</description>
-
-  <artifactId>samoa-api</artifactId>
-  <parent>
-    <groupId>org.apache.samoa</groupId>
-    <artifactId>samoa</artifactId>
-    <version>0.5.0-incubating-SNAPSHOT</version>
-  </parent>
-
-  <dependencies>
-    <dependency>
-      <groupId>com.yammer.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-      <version>${metrics-core.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>net.jcip</groupId>
-      <artifactId>jcip-annotations</artifactId>
-      <version>${jcip-annotations.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-      <version>${commons-lang3.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.github.javacliparser</groupId>
-      <artifactId>javacliparser</artifactId>
-      <version>${javacliparser.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.samoa</groupId>
-      <artifactId>samoa-instances</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>${guava.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.esotericsoftware.kryo</groupId>
-      <artifactId>kryo</artifactId>
-      <version>${kryo.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.dreizak</groupId>
-      <artifactId>miniball</artifactId>
-      <version>${miniball.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <version>${hadoop.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-minicluster</artifactId>
-      <version>${hadoop.version}</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <version>${maven-dependency-plugin.version}</version>
-        <executions>
-          <execution>
-            <id>copy-dependencies</id>
-            <phase>package</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <outputDirectory>${project.build.directory}/lib</outputDirectory>
-              <overWriteReleases>false</overWriteReleases>
-              <overWriteSnapshots>false</overWriteSnapshots>
-              <overWriteIfNewer>true</overWriteIfNewer>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <name>samoa-api</name>
+    <description>API and algorithms for SAMOA</description>
+
+    <artifactId>samoa-api</artifactId>
+    <parent>
+        <groupId>org.apache.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yammer.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${metrics-core.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>net.jcip</groupId>
+            <artifactId>jcip-annotations</artifactId>
+            <version>${jcip-annotations.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.javacliparser</groupId>
+            <artifactId>javacliparser</artifactId>
+            <version>${javacliparser.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.samoa</groupId>
+            <artifactId>samoa-instances</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>${kryo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.dreizak</groupId>
+            <artifactId>miniball</artifactId>
+            <version>${miniball.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>${maven-dependency-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
new file mode 100644
index 0000000..fbd3ec6
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+class KafkaConsumerThread extends Thread {
+
+    // Consumer class for internal use to retrieve messages from Kafka
+    private transient KafkaConsumer<String, byte[]> consumer;
+
+    private Logger log = Logger.getLogger(KafkaConsumerThread.class.getName());
+
+    private final Properties consumerProperties;
+    private final Collection<String> topics;
+    private final long consumerTimeout;
+    private final List<byte[]> buffer;
+    // used to synchronize things
+    private final Object lock;
+    private boolean running;
+
+    /**
+     * Class constructor
+     *
+     * @param consumerProperties Properties of Consumer
+     * @param topics Topics to fetch (subscribe)
+     * @param consumerTimeout Timeout for data polling
+     */
+    KafkaConsumerThread(Properties consumerProperties, Collection<String> 
topics, long consumerTimeout) {
+        this.running = false;
+        this.consumerProperties = consumerProperties;
+        this.topics = topics;
+        this.consumerTimeout = consumerTimeout;
+        this.buffer = new ArrayList<>();
+        lock = new Object();
+    }
+
+    @Override
+    public void run() {
+
+        initializeConsumer();
+
+        while (running) {
+            fetchDataFromKafka();
+        }
+
+        cleanUp();
+    }
+
+    /**
+     * Method for fetching data from Apache Kafka. It takes care of received
+     * data
+     */
+    private void fetchDataFromKafka() {
+        if (consumer != null) {
+            if (!consumer.subscription().isEmpty()) {
+                try {
+                    List<byte[]> kafkaMsg = 
getMessagesBytes(consumer.poll(consumerTimeout));
+                    fillBufferAndNotifyWaits(kafkaMsg);
+                } catch (Throwable t) {
+                    
Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, 
t);
+                }
+            }
+        }
+    }
+
+    /**
+     * Copies received messages to class buffer and notifies Processor to grab
+     * the data.
+     *
+     * @param kafkaMsg Messages received from Kafka
+     */
+    private void fillBufferAndNotifyWaits(List<byte[]> kafkaMsg) {
+        synchronized (lock) {
+            buffer.addAll(kafkaMsg);
+            if (buffer.size() > 0) {
+                lock.notifyAll();
+            }
+        }
+    }
+
+    private void cleanUp() {
+        // clean resources
+        if (consumer != null) {
+            consumer.unsubscribe();
+            consumer.close();
+        }
+    }
+
+    private void initializeConsumer() {
+        // lazy instantiation
+        log.log(Level.INFO, "Instantiating Kafka consumer");
+        if (consumer == null) {
+            consumer = new KafkaConsumer<>(consumerProperties);
+            running = true;
+        }
+        consumer.subscribe(topics);
+    }
+
+    private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> 
poll) {
+        Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator();
+        List<byte[]> ret = new ArrayList<>();
+        while (iterator.hasNext()) {
+            ret.add(iterator.next().value());
+        }
+        return ret;
+    }
+
+    void close() {
+        running = false;
+    }
+
+    List<byte[]> getKafkaMessages() {
+        synchronized (lock) {
+            if (buffer.isEmpty()) {
+                try {
+                    // block the call until new messages are received
+                    lock.wait();
+                } catch (InterruptedException ex) {
+                    
Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, 
ex);
+                }
+            }
+            ArrayList<byte[]> ret = new ArrayList<>();
+            // copy buffer to return list
+            ret.addAll(buffer);
+            // clear message buffer
+            buffer.clear();
+            return ret;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
new file mode 100644
index 0000000..459c491
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @param <T> the class that would be deserialized
+ */
+public interface KafkaDeserializer<T extends ContentEvent> {
+    
+    // TODO: Consider key-value schema?
+    /**
+     * Method that provides deserialization algorithm
+     * @param message Message as received from Apache Kafka
+     * @return Deserialized form of message, to be passed to topology
+     */
+    T deserialize(byte[] message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
new file mode 100644
index 0000000..231e25d
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Destination processor that writes data to Apache Kafka
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaDestinationProcessor implements Processor {
+
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        kafkaUtils.closeProducer();
+    }
+
+    private final KafkaUtils kafkaUtils;
+    private final String topic;
+    private final KafkaSerializer serializer;
+
+    /**
+     * Class constructor
+     * @param props Properties of Kafka Producer
+     * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
configuration</a>
+     * @param topic Topic this destination processor will write into
+     * @param serializer Implementation of KafkaSerializer that handles 
arriving data serialization
+     */
+    public KafkaDestinationProcessor(Properties props, String topic, 
KafkaSerializer serializer) {
+        this.kafkaUtils = new KafkaUtils(null, props, 0);
+        this.topic = topic;
+        this.serializer = serializer;
+    }
+    
+    private KafkaDestinationProcessor(KafkaUtils kafkaUtils, String topic, 
KafkaSerializer serializer){
+        this.kafkaUtils = kafkaUtils;
+        this.topic = topic;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public boolean process(ContentEvent event) {
+        try {
+            kafkaUtils.sendKafkaMessage(topic, serializer.serialize(event));
+        } catch (Exception ex) {
+            
Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, 
null, ex);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void onCreate(int id) {
+        kafkaUtils.initializeProducer();
+    }
+
+    @Override
+    public Processor newProcessor(Processor processor) {
+        KafkaDestinationProcessor kdp = (KafkaDestinationProcessor)processor;
+        return new KafkaDestinationProcessor(new KafkaUtils(kdp.kafkaUtils), 
kdp.topic, kdp.serializer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
new file mode 100644
index 0000000..866a457
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Entrance processor that reads incoming messages from <a 
href="https://kafka.apache.org/";>Apache Kafka</a>
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaEntranceProcessor implements EntranceProcessor {
+
+    transient private final KafkaUtils kafkaUtils;
+    private List<byte[]> buffer;
+    private final KafkaDeserializer deserializer;
+    private final String topic;
+
+    /**
+     * Class constructor
+     * @param props Properties of Kafka consumer
+     * @see  <a 
href="https://kafka.apache.org/documentation/#newconsumerconfigs";> Apache Kafka 
consumer configuration</a>
+     * @param topic Topic from which the messages should be read
+     * @param timeout Timeout used when polling Kafka for new messages
+     * @param deserializer Instance of the implementation of {@link 
KafkaDeserializer}
+     */
+    public KafkaEntranceProcessor(Properties props, String topic, int timeout, 
KafkaDeserializer deserializer) {
+        this.kafkaUtils = new KafkaUtils(props, null, timeout);
+        this.deserializer = deserializer;
+        this.topic = topic;
+    }
+
+    private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer 
deserializer, String topic) {
+        this.kafkaUtils = kafkaUtils;
+        this.deserializer = deserializer;
+        this.topic = topic;
+    }
+
+    @Override
+    public void onCreate(int id) {
+        this.buffer = new ArrayList<>(100);
+        this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic));
+    }
+
+    @Override
+    public boolean isFinished() {
+        return false;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (buffer.isEmpty()) {
+            try {
+                buffer.addAll(kafkaUtils.getKafkaMessages());
+            } catch (Exception ex) {
+                
Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, 
null, ex);
+            }
+        }
+        return buffer.size() > 0;
+    }
+
+    @Override
+    public ContentEvent nextEvent() {
+        // assume this will never be called when buffer is empty!        
+        return this.deserializer.deserialize(buffer.remove(0));
+    }
+
+    @Override
+    public boolean process(ContentEvent event) {
+        return false;
+    }
+
+    @Override
+    public Processor newProcessor(Processor processor) {
+        KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor;
+        return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), 
kep.deserializer, kep.topic);
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        kafkaUtils.closeConsumer();
+        super.finalize();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
new file mode 100644
index 0000000..2bbc259
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @param <T> the class that would be serialized
+ */
+public interface KafkaSerializer<T extends ContentEvent> {
+    
+    // TODO: Consider Key-Value schema?
+    
+    /**
+     * Method that provides serialization algorithm
+     * @param message Message received from topology, to be serialized
+     * @return Serialized form of the message
+     */
+    byte[] serialize(T message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
new file mode 100644
index 0000000..fb3aef7
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+/**
+ * Internal class responsible for Kafka Stream handling (both consume and
+ * produce)
+ *
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+class KafkaUtils {
+
+    private transient KafkaConsumerThread kafkaConsumerThread;
+
+    private transient KafkaProducer<String, byte[]> producer;
+
+    // Properties of the consumer, as defined in Kafka documentation
+    private final Properties consumerProperties;
+    private final Properties producerProperties;
+
+    // Timeout for Kafka Consumer    
+    private long consumerTimeout;
+        
+
+    /**
+     * Class constructor
+     *
+     * @param consumerProperties Properties of consumer
+     * @param producerProperties Properties of producer
+     * @param consumerTimeout Timeout for consumer poll requests
+     */
+    public KafkaUtils(Properties consumerProperties, Properties 
producerProperties, long consumerTimeout) {
+        this.consumerProperties = consumerProperties;
+        this.producerProperties = producerProperties;
+        this.consumerTimeout = consumerTimeout;
+    }
+
+    /**
+     * Creates new KafkaUtils from existing instance
+     * @param kafkaUtils Instance of KafkaUtils
+     */
+    KafkaUtils(KafkaUtils kafkaUtils) {
+        this.consumerProperties = kafkaUtils.consumerProperties;
+        this.producerProperties = kafkaUtils.producerProperties;
+        this.consumerTimeout = kafkaUtils.consumerTimeout;
+    }
+
+    /**
+     * Method used to initialize Kafka Consumer Thread, i.e. instantiate it and
+     * subscribe to configured topic
+     *
+     * @param topics List of Kafka topics that consumer should subscribe to
+     */
+    public void initializeConsumer(Collection<String> topics) {        
+        kafkaConsumerThread = new KafkaConsumerThread(consumerProperties, 
topics, consumerTimeout);
+        kafkaConsumerThread.start();        
+    }
+
+    public void closeConsumer() {
+        kafkaConsumerThread.close();
+    }
+
+    public void initializeProducer() {
+        // lazy instantiation
+        if (producer == null) {
+            producer = new KafkaProducer<>(producerProperties);
+        }
+    }
+
+    public void closeProducer(){
+        if(producer != null){
+            producer.close(1, TimeUnit.MINUTES);
+        }
+    }
+    
+    /**
+     * Method for reading new messages from Kafka topics
+     *
+     * @return Collection of read messages
+     * @throws Exception Exception is thrown when consumer was not initialized
+     * or is not subscribed to any topic.
+     */
+    public List<byte[]> getKafkaMessages() throws Exception {
+        return kafkaConsumerThread.getKafkaMessages();
+    }
+
+    public long sendKafkaMessage(String topic, byte[] message) {
+        if (producer != null) {
+            try{
+            ProducerRecord<String, byte[]> record = new ProducerRecord(topic, 
message);
+            long offset = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
+            producer.flush();
+            return offset;
+            } catch(InterruptedException | ExecutionException | 
TimeoutException e){
+                Logger.getLogger(KafkaUtils.class.getName()).log(Level.SEVERE, 
null, e);
+            }
+            
+        }
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java 
b/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java
new file mode 100644
index 0000000..f0597a8
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.github.javacliparser.ClassOption;
+import java.util.Properties;
+
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.apache.samoa.streams.kafka.KafkaDeserializer;
+import org.apache.samoa.streams.kafka.KafkaDestinationProcessor;
+import org.apache.samoa.streams.kafka.KafkaEntranceProcessor;
+import org.apache.samoa.streams.kafka.KafkaSerializer;
+
+/**
+ * Kafka task
+ *
+ * @author Jakub Jankowski
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ *
+ */
+public class KafkaTask implements Task, Configurable {
+
+  private static final long serialVersionUID = 3984474041982397855L;
+  private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
+
+  Properties producerProps;
+  Properties consumerProps;
+  int timeout;
+  private KafkaDeserializer deserializer;
+  private KafkaSerializer serializer;
+  private String inTopic;
+  private String outTopic;
+
+  private TopologyBuilder builder;
+  private Topology kafkaTopology;
+
+  public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 
'p',
+          "Number of destination Processors", 1, 1, Integer.MAX_VALUE);
+
+  public IntOption timeoutOption = new IntOption("timeout", 't',
+          "Kafka consumer timeout", 1, 1, Integer.MAX_VALUE);
+
+  public StringOption inputBrokerOption = new StringOption("inputBroker", 'r', 
"Input brokers addresses",
+          "inputTopic");
+
+  public StringOption outputBrokerOption = new StringOption("outputBroker", 
's', "Output brokers name",
+          "inputTopic");
+
+  public StringOption inputTopicOption = new StringOption("inputTopic", 'i', 
"Input topic name",
+          "inputTopic");
+
+  public StringOption outputTopicOption = new StringOption("outputTopic", 'o', 
"Output topic name",
+          "outputTopic");
+
+  public ClassOption serializerOption = new ClassOption("serializer", 'w',
+          "Serializer class name",
+          KafkaSerializer.class, KafkaSerializer.class.getName());
+
+  public ClassOption deserializerOption = new ClassOption("deserializer", 'd',
+          "Deserializer class name",
+          KafkaDeserializer.class, KafkaDeserializer.class.getName());
+
+  public StringOption taskNameOption = new StringOption("taskName", 'n', 
"Identifier of the task",
+          "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new 
Date()));
+
+  /**
+   * Class constructor (for tests purposes)
+   *
+   * @param producerProps Properties of Kafka Producer and Consumer
+   * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka
+   * Producer configuration</a>
+   * @param consumerProps Properties of Kafka Producer and Consumer
+   * @see <a 
href="http://kafka.apache.org/documentation/#consumerconfigs";>Kafka
+   * Consumer configuration</a>
+   * @param inTopic Topic to which destination processor will read from
+   * @param outTopic Topic to which destination processor will write into
+   * @param timeout Timeout used when polling Kafka for new messages
+   * @param serializer Implementation of KafkaSerializer that handles arriving
+   * data serialization
+   * @param deserializer Implementation of KafkaDeserializer that handles
+   * arriving data deserialization
+   */
+  public KafkaTask(Properties producerProps, Properties consumerProps, String 
inTopic, String outTopic, int timeout, KafkaSerializer serializer, 
KafkaDeserializer deserializer) {
+    this.producerProps = producerProps;
+    this.consumerProps = consumerProps;
+    this.deserializer = deserializer;
+    this.serializer = serializer;
+    this.inTopic = inTopic;
+    this.outTopic = outTopic;
+    this.timeout = timeout;
+  }
+
+  /**
+   * Class constructor
+   */
+  public KafkaTask() {
+
+  }
+
+  @Override
+  public void init() {
+    producerProps = new Properties();
+    producerProps.setProperty("bootstrap.servers", 
outputBrokerOption.getValue());
+
+    consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", 
inputBrokerOption.getValue());
+
+    serializer = serializerOption.getValue();
+
+    deserializer = deserializerOption.getValue();
+
+    inTopic = inputTopicOption.getValue();
+    outTopic = outputTopicOption.getValue();
+
+    timeout = timeoutOption.getValue();
+
+    logger.info("Invoking init");
+    if (builder == null) {
+      builder = new TopologyBuilder();
+      logger.info("Successfully instantiating TopologyBuilder");
+
+      builder.initTopology(taskNameOption.getValue());
+      logger.info("Successfully initializing SAMOA topology with name {}", 
taskNameOption.getValue());
+    }
+
+    // create enterance processor
+    KafkaEntranceProcessor sourceProcessor = new 
KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer);
+    builder.addEntranceProcessor(sourceProcessor);
+
+    // create stream
+    Stream stream = builder.createStream(sourceProcessor);
+
+    // create destination processor
+    KafkaDestinationProcessor destProcessor = new 
KafkaDestinationProcessor(producerProps, outTopic, serializer);
+    builder.addProcessor(destProcessor, kafkaParallelismOption.getValue());
+    builder.connectInputShuffleStream(stream, destProcessor);
+
+    // build topology
+    kafkaTopology = builder.build();
+    logger.info("Successfully built the topology");
+  }
+
+  @Override
+  public Topology getTopology() {
+    return kafkaTopology;
+  }
+
+  @Override
+  public void setFactory(ComponentFactory factory) {
+    logger.info("Invoking setFactory: " + factory.toString());
+    builder = new TopologyBuilder(factory);
+    logger.info("Successfully instantiating TopologyBuilder");
+
+    builder.initTopology(taskNameOption.getValue());
+    logger.info("Successfully initializing SAMOA topology with name {}", 
taskNameOption.getValue());
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/main/resources/kafka.avsc
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/resources/kafka.avsc 
b/samoa-api/src/main/resources/kafka.avsc
new file mode 100644
index 0000000..f5f12cf
--- /dev/null
+++ b/samoa-api/src/main/resources/kafka.avsc
@@ -0,0 +1,106 @@
+[
+{
+  "namespace": "org.apache.samoa.streams.kafka.temp",
+  "type": "record",
+  "name": "BurrTest",
+  "fields": [
+               {"name":"name", "type": "string"},
+               {"name":"atrs", "type": {"type": "array", "items": "string"}},
+               {"name":"nums", "type": {"type": "array", "items": "int"}},
+               {"name":"list", "type": {"type": "array", "items": "string"}}
+       ]
+},
+{
+  "namespace": "org.apache.samoa.instances",
+  "type": "record",
+  "name": "Instance",
+  "fields": [
+       ]
+},
+{
+  "namespace": "org.apache.samoa.instances",
+  "type": "record",
+  "name": "InstanceData",
+  "fields": [
+       ]
+},
+{
+  "namespace": "org.apache.samoa.instances",
+  "type": "record",
+  "name": "SingleClassInstanceData",
+  "fields": [
+               {"name":"classValue", "type": "double"}
+       ]
+},
+{
+  "namespace": "org.apache.samoa.instances",
+  "type": "record",
+  "name": "DenseInstanceData",
+  "fields": [
+               {"name":"attributeValues", "type": {"type": "array", "items": 
"double"}}
+       ]
+},
+{
+  "namespace": "org.apache.samoa.instances",
+  "type": "record",
+  "name": "SparseInstanceData",
+  "fields": [
+               {"name":"attributeValues", "type": {"type": "array", "items": 
"double"}},
+               {"name":"indexValues", "type": {"type": "array", "items": 
"int"}},
+               {"name":"numberAttributes", "type": "int"}
+       ]
+},
+{
+  "namespace": "org.apache.samoa.instances",
+  "type": "record",
+  "name": "SingleLabelInstance",
+  "fields": [
+               {"name": "weight", "type": "double"},
+               {"name": "instanceData", "type": ["null", 
"org.apache.samoa.instances.InstanceData", 
"org.apache.samoa.instances.DenseInstanceData", 
"org.apache.samoa.instances.SparseInstanceData", 
"org.apache.samoa.instances.SingleClassInstanceData"]},
+               {"name": "classData", "type": ["null", 
"org.apache.samoa.instances.InstanceData", 
"org.apache.samoa.instances.DenseInstanceData", 
"org.apache.samoa.instances.SparseInstanceData", 
"org.apache.samoa.instances.SingleClassInstanceData"]}
+       ]
+},
+{
+  "namespace": "org.apache.samoa.instances",
+  "type": "record",
+  "name": "DenseInstance",
+  "fields": [
+               {"name": "weight", "type": "double"},
+               {"name": "instanceData", "type": ["null", 
"org.apache.samoa.instances.InstanceData", 
"org.apache.samoa.instances.DenseInstanceData", 
"org.apache.samoa.instances.SparseInstanceData", 
"org.apache.samoa.instances.SingleClassInstanceData"]},
+               {"name": "classData", "type": ["null", 
"org.apache.samoa.instances.InstanceData", 
"org.apache.samoa.instances.DenseInstanceData", 
"org.apache.samoa.instances.SparseInstanceData", 
"org.apache.samoa.instances.SingleClassInstanceData"]}
+       ]
+},
+{
+  "namespace": "org.apache.samoa.core",
+  "type": "record",
+  "name": "SerializableInstance",
+  "fields": [
+               {"name": "weight", "type": "double"},
+               {"name": "instanceData", "type": ["null", 
"org.apache.samoa.instances.InstanceData", 
"org.apache.samoa.instances.DenseInstanceData", 
"org.apache.samoa.instances.SparseInstanceData", 
"org.apache.samoa.instances.SingleClassInstanceData"]},
+               {"name": "classData", "type": ["null", 
"org.apache.samoa.instances.InstanceData", 
"org.apache.samoa.instances.DenseInstanceData", 
"org.apache.samoa.instances.SparseInstanceData", 
"org.apache.samoa.instances.SingleClassInstanceData"]}
+       ]
+},
+{
+  "namespace": "org.apache.samoa.learners",
+  "type": "record",
+  "name": "InstanceContent",
+  "fields": [
+               {"name": "instanceIndex", "type": "long"},
+               {"name": "classifierIndex", "type": "int"},
+               {"name": "evaluationIndex", "type": "int"},
+               {"name":"instance", 
"type":"org.apache.samoa.core.SerializableInstance"},
+               {"name": "isTraining", "type": "boolean"},
+               {"name": "isTesting", "type": "boolean"},
+               {"name": "isLast", "type": "boolean"}
+       ]
+},
+{
+ "namespace": "org.apache.samoa.learners",
+ "type": "record",
+ "name": "InstanceContentEvent",
+ "fields": [
+     {"name": "instanceContent", "type": 
"org.apache.samoa.learners.InstanceContent"}
+ ]
+}
+]
+   

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
new file mode 100644
index 0000000..930ab23
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaDestinationProcessorTest {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC = "test-kdp";
+    private static final int NUM_INSTANCES = 11111;
+    private static final int CONSUMER_TIMEOUT = 1000;
+
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+
+    public KafkaDestinationProcessorTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // create topic
+        AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+
+    }
+
+    @AfterClass
+    public static void tearDownClass() {        
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+
+    @Before
+    public void setUp() throws IOException {
+
+    }
+
+    @After
+    public void tearDown() {
+
+    }
+
+    @Test
+    public void testSendingData() throws InterruptedException, 
ExecutionException, TimeoutException {
+
+        final Logger logger = 
Logger.getLogger(KafkaDestinationProcessorTest.class.getName());
+        Properties props = 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT);
+        props.setProperty("auto.offset.reset", "earliest");
+        KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, 
TOPIC, new OosTestSerializer());
+        kdp.onCreate(1);
+
+        final int[] i = {0};
+
+        // prepare new thread for data receiveing
+        Thread th = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KafkaConsumer<String, byte[]> consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT));
+                consumer.subscribe(Arrays.asList(TOPIC));
+                while (i[0] < NUM_INSTANCES) {
+                    try {
+                        ConsumerRecords<String, byte[]> cr = 
consumer.poll(CONSUMER_TIMEOUT);
+                        Iterator<ConsumerRecord<String, byte[]>> it = 
cr.iterator();
+                        while (it.hasNext()) {
+                            ConsumerRecord<String, byte[]> record = it.next();
+                            i[0]++;
+                        }
+                    } catch (Exception ex) {
+                        
Logger.getLogger(KafkaDestinationProcessorTest.class.getName()).log(Level.SEVERE,
 null, ex);
+                    }
+                }
+                consumer.close();
+            }
+        });
+        th.start();
+
+        int z = 0;
+        Random r = new Random();
+        InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+
+        for (z = 0; z < NUM_INSTANCES; z++) {
+            InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, 
header);
+            kdp.process(event);
+//            logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with 
id: ", z});
+        }
+
+        // wait for all instances to be read
+        Thread.sleep(2 * CONSUMER_TIMEOUT);
+        assertEquals("Number of sent and received instances", z, i[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
new file mode 100644
index 0000000..55c3b85
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samoa.instances.InstancesHeader;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @author Jakub Jankowski
+ */
+public class KafkaEntranceProcessorTest {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC_OOS = "samoa_test-oos";
+    private static final int NUM_INSTANCES = 11111;
+
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+    private static final int TIMEOUT = 1000;
+
+    public KafkaEntranceProcessorTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // create topics        
+        AdminUtils.createTopic(zkUtils, TOPIC_OOS, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        try {
+            kafkaServer.shutdown();
+            zkClient.close();
+            zkServer.shutdown();
+        } catch (Exception ex) {
+            
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
+        }
+    }
+
+    @Before
+    public void setUp() throws IOException {
+
+    }
+
+    @After
+    public void tearDown() {
+
+    }
+
+    @Test
+    public void testFetchingNewData() throws InterruptedException, 
ExecutionException, TimeoutException {
+
+        final Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
+        logger.log(Level.INFO, "OOS");
+        logger.log(Level.INFO, "testFetchingNewData");
+        Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT);
+        props.setProperty("auto.offset.reset", "earliest");
+        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_OOS, TIMEOUT, new OosTestSerializer());
+
+        kep.onCreate(1);
+
+        // prepare new thread for data producing
+        Thread th = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, 
BROKERPORT));
+
+                Random r = new Random();
+                InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+                OosTestSerializer serializer = new OosTestSerializer();
+                int i = 0;
+                for (i = 0; i < NUM_INSTANCES; i++) {
+                    try {
+                        InstanceContentEvent event = 
TestUtilsForKafka.getData(r, 10, header);
+
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_OOS, serializer.serialize(event));
+                        long stat = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
+                    } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
+                    }
+                }
+                producer.flush();
+                producer.close();
+            }
+        });
+        th.start();
+
+        int z = 0;
+        while (z < NUM_INSTANCES && kep.hasNext()) {
+            InstanceContentEvent event = (InstanceContentEvent) 
kep.nextEvent();
+            z++;
+        }
+
+        assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
new file mode 100644
index 0000000..186d97b
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.instances.InstancesHeader;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaUtilsTest {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC_R = "test-r";
+    private static final String TOPIC_S = "test-s";
+    private static final int NUM_INSTANCES = 50;
+
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+
+    private static final Logger logger = 
Logger.getLogger(KafkaUtilsTest.class.getCanonicalName());
+    private final long CONSUMER_TIMEOUT = 1500;
+
+    public KafkaUtilsTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // create topics
+        AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+
+    @Before
+    public void setUp() {
+    }
+
+    @After
+    public void tearDown() {
+    }
+
+    /**
+     * Test of initializeConsumer method, of class KafkaUtils.
+     */
+    @Test
+    public void testInitializeConsumer() throws Exception {
+        logger.log(Level.INFO, "initializeConsumer");
+        Collection<String> topics = Arrays.asList(TOPIC_R);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), 
TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT), 
CONSUMER_TIMEOUT);
+        assertNotNull(instance);
+
+        instance.initializeConsumer(topics);
+        Thread.sleep(1000);
+        instance.closeConsumer();
+
+        Thread.sleep(CONSUMER_TIMEOUT);
+
+        instance.initializeConsumer(topics);
+        Thread.sleep(1000);
+        instance.closeConsumer();
+        assertTrue(true);
+    }
+
+    /**
+     * Test of getKafkaMessages method, of class KafkaUtils.
+     */
+    @Test
+    public void testGetKafkaMessages() throws Exception {
+        logger.log(Level.INFO, "getKafkaMessages");
+        Collection<String> topics = Arrays.asList(TOPIC_R);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), 
TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT), 
CONSUMER_TIMEOUT);
+        assertNotNull(instance);
+
+        logger.log(Level.INFO, "Initialising consumer");
+        instance.initializeConsumer(topics);
+
+        logger.log(Level.INFO, "Produce data");
+        List expResult = sendAndGetMessages(NUM_INSTANCES);
+
+        logger.log(Level.INFO, "Wait a moment");
+        Thread.sleep(CONSUMER_TIMEOUT);
+
+        logger.log(Level.INFO, "Get results from Kafka");
+        List<byte[]> result = instance.getKafkaMessages();
+
+        assertArrayEquals(expResult.toArray(), result.toArray());
+        instance.closeConsumer();
+    }
+
+    private List<byte[]> sendAndGetMessages(int maxNum) throws 
InterruptedException, ExecutionException, TimeoutException {
+        List<byte[]> ret;
+        try (KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test", 
BROKERHOST, BROKERPORT))) {
+            ret = new ArrayList<>();
+            Random r = new Random();
+            InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+            Gson gson = new Gson();
+            int i = 0;
+            for (i = 0; i < maxNum; i++) {
+                ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
+                ret.add(record.value());
+                producer.send(record);
+            }
+            producer.flush();
+        }
+        return ret;
+    }
+
+    /**
+     * Test of sendKafkaMessage method, of class KafkaUtils.
+     *
+     * @throws java.lang.InterruptedException
+     */
+    @Test
+    public void testSendKafkaMessage() throws InterruptedException {
+        logger.log(Level.INFO, "sendKafkaMessage");
+
+        logger.log(Level.INFO, "Initialising producer");
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), 
TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST, BROKERPORT), 
CONSUMER_TIMEOUT);
+        instance.initializeProducer();
+
+        logger.log(Level.INFO, "Initialising consumer");
+        KafkaConsumer<String, byte[]> consumer;
+        consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT));
+        consumer.subscribe(Arrays.asList(TOPIC_S));
+
+        logger.log(Level.INFO, "Produce data");
+        List<byte[]> sent = new ArrayList<>();
+        Random r = new Random();
+        InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+        Gson gson = new Gson();
+        for (int i = 0; i < NUM_INSTANCES; i++) {
+            byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes();
+            sent.add(val);
+            instance.sendKafkaMessage(TOPIC_S, val);
+        }
+        // wait for Kafka a bit :)
+        Thread.sleep(2 * CONSUMER_TIMEOUT);
+
+        logger.log(Level.INFO, "Get results from Kafka");
+        
+        List<byte[]> consumed = new ArrayList<>();
+        
+        while (consumed.size() != sent.size()) {
+            ConsumerRecords<String, byte[]> records = 
consumer.poll(CONSUMER_TIMEOUT);
+            Iterator<ConsumerRecord<String, byte[]>> it = records.iterator();
+            while (it.hasNext()) {
+                consumed.add(it.next().value());
+            }
+        }
+        consumer.close();
+
+        assertArrayEquals(sent.toArray(), consumed.toArray());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
new file mode 100644
index 0000000..14535bb
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.learners.InstanceContentEvent;
+
+/**
+ *
+ * @author Piotr Wawrzyniak
+ */
+public class OosTestSerializer implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> {
+
+    @Override
+    public InstanceContentEvent deserialize(byte[] message) {
+        try {
+            ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(message));
+            InstanceContentEvent ice = (InstanceContentEvent)ois.readObject();
+            return ice;
+        } catch (IOException | ClassNotFoundException ex) {
+            
Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
+        }
+        return null;
+    }
+
+    @Override
+    public byte[] serialize(InstanceContentEvent message) {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(message);
+            oos.flush();
+            return baos.toByteArray();            
+        } catch (IOException ex) {
+            
Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
+        }        
+        return null;
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
new file mode 100644
index 0000000..8936759
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.Properties;
+import java.util.Random;
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.moa.core.FastVector;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class TestUtilsForKafka {
+
+    protected static InstanceContentEvent getData(Random instanceRandom, int 
numAtts, InstancesHeader header) {
+        double[] attVals = new double[numAtts + 1];
+        double sum = 0.0;
+        double sumWeights = 0.0;
+        for (int i = 0; i < numAtts; i++) {
+            attVals[i] = instanceRandom.nextDouble();
+
+        }
+        int classLabel;
+        if (sum >= sumWeights * 0.5) {
+            classLabel = 1;
+        } else {
+            classLabel = 0;
+        }
+
+        Instance inst = new DenseInstance(1.0, attVals);
+        inst.setDataset(header);
+        inst.setClassValue(classLabel);
+
+        return new InstanceContentEvent(0, inst, true, false);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected static InstancesHeader generateHeader(int numAttributes) {
+        FastVector attributes = new FastVector();
+        for (int i = 0; i < numAttributes; i++) {
+            attributes.addElement(new Attribute("att" + (i + 1)));
+        }
+
+        FastVector classLabels = new FastVector();
+        for (int i = 0; i < numAttributes; i++) {
+            classLabels.addElement("class" + (i + 1));
+        }
+        attributes.addElement(new Attribute("class", classLabels));
+        InstancesHeader streamHeader = new InstancesHeader(new 
Instances("test-kafka", attributes, 0));
+        streamHeader.setClassIndex(streamHeader.numAttributes() - 1);
+        return streamHeader;
+    }
+
+    
+        protected static Properties getProducerProperties(String BROKERHOST, 
String BROKERPORT) {
+        return getProducerProperties("test", BROKERHOST, BROKERPORT);
+    }
+    
+    /**
+     *
+     * @param clientId
+     * @return
+     */
+    protected static Properties getProducerProperties(String clientId, String 
BROKERHOST, String BROKERPORT) {
+        Properties producerProps = new Properties();
+        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
+        producerProps.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.setProperty("group.id", "test");
+        producerProps.setProperty("client.id", clientId);
+        return producerProps;
+    }
+
+    protected static Properties getConsumerProperties(String BROKERHOST, 
String BROKERPORT) {
+        Properties consumerProps = new Properties();
+        consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
+        consumerProps.put("enable.auto.commit", "true");
+        consumerProps.put("auto.commit.interval.ms", "1000");
+        consumerProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+        consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.setProperty("group.id", "test");
+        consumerProps.setProperty("auto.offset.reset", "earliest");
+        return consumerProps;
+    }
+    
+    protected static Properties getConsumerProducerProperties(String 
BROKERHOST, String BROKERPORT) {
+        Properties props = new Properties();
+        props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+        props.put("enable.auto.commit", "true");
+        props.put("auto.commit.interval.ms", "1000");
+        props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+        props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.setProperty("group.id", "burrito");
+        props.setProperty("auto.offset.reset", "earliest");
+        props.setProperty("client.id", "burrito");
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/804eac8c/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java 
b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
index ad2b383..d1e3a53 100644
--- a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
+++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
@@ -32,7 +32,9 @@ import java.util.Map;
 import java.util.Properties;
 
 import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
 import kafka.utils.ZKStringSerializer;
+import kafka.utils.ZkUtils;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
@@ -72,7 +74,9 @@ public class SystemsUtils {
      * Create Kafka topic/stream
      */
     static void createKafkaTopic(String name, int partitions, int replicas) {
-      AdminUtils.createTopic(zkClient, name, partitions, replicas, new 
Properties());
+        // Fix for Apache Kafka 0.10
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+      AdminUtils.createTopic(zkUtils, name, partitions, replicas, new 
Properties(), RackAwareMode.Disabled$.MODULE$);
     }
 
     static class ZKStringSerializerWrapper implements ZkSerializer {

Reply via email to