Tests for kafkaUtils and KafkaEntranceProcessor, minor changes in classes

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/43f69c62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/43f69c62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/43f69c62

Branch: refs/heads/master
Commit: 43f69c6235066ba8cfc49e8079621b71c4f43f4c
Parents: 0db63b9
Author: pwawrzyniak <[email protected]>
Authored: Fri Mar 24 14:34:49 2017 +0100
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .gitignore                                      |  31 +-
 samoa-api/pom.xml                               | 289 ++++++++++---------
 .../samoa/streams/kafka/KafkaDeserializer.java  |  23 +-
 .../kafka/KafkaDestinationProcessor.java        |  23 +-
 .../streams/kafka/KafkaEntranceProcessor.java   |  28 ++
 .../samoa/streams/kafka/KafkaJsonMapper.java    |  41 ++-
 .../samoa/streams/kafka/KafkaSerializer.java    |  23 +-
 .../apache/samoa/streams/kafka/KafkaUtils.java  |  79 ++++-
 .../kafka/KafkaEntranceProcessorTest.java       | 212 ++++++++++++++
 .../samoa/streams/kafka/KafkaUtilsTest.java     | 235 +++++++++++++++
 .../samoa/streams/kafka/TestUtilsForKafka.java  | 132 +++++++++
 11 files changed, 945 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 294c718..a834232 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,15 +1,16 @@
-#maven
-target/
-
-#eclipse
-.classpath
-.project
-.settings/
-
-#DS_Store
-.DS_Store
-
-#intellij
-.idea/
-*.iml
-*.iws
+#maven
+target/
+
+#eclipse
+.classpath
+.project
+.settings/
+
+#DS_Store
+.DS_Store
+
+#intellij
+.idea/
+*.iml
+*.iws
+/samoa-api/nbproject/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml
index 4621b93..2b7bd22 100644
--- a/samoa-api/pom.xml
+++ b/samoa-api/pom.xml
@@ -1,135 +1,154 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  #%L
-  SAMOA
-  %%
-  Copyright (C) 2014 - 2015 Apache Software Foundation
-  %%
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-  
-       http://www.apache.org/licenses/LICENSE-2.0
-  
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-#L%
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    </properties>
-
-    <name>samoa-api</name>
-    <description>API and algorithms for SAMOA</description>
-
-    <artifactId>samoa-api</artifactId>
-    <parent>
-        <groupId>org.apache.samoa</groupId>
-        <artifactId>samoa</artifactId>
-        <version>0.5.0-incubating-SNAPSHOT</version>
-    </parent>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.yammer.metrics</groupId>
-            <artifactId>metrics-core</artifactId>
-            <version>${metrics-core.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>net.jcip</groupId>
-            <artifactId>jcip-annotations</artifactId>
-            <version>${jcip-annotations.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-            <version>${commons-lang3.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.github.javacliparser</groupId>
-            <artifactId>javacliparser</artifactId>
-            <version>${javacliparser.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.samoa</groupId>
-            <artifactId>samoa-instances</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.esotericsoftware.kryo</groupId>
-            <artifactId>kryo</artifactId>
-            <version>${kryo.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.dreizak</groupId>
-            <artifactId>miniball</artifactId>
-            <version>${miniball.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-minicluster</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>test</scope>
-        </dependency>
-        
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>0.10.2.0</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-dependency-plugin</artifactId>
-                <version>${maven-dependency-plugin.version}</version>
-                <executions>
-                    <execution>
-                        <id>copy-dependencies</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>copy-dependencies</goal>
-                        </goals>
-                        <configuration>
-                            
<outputDirectory>${project.build.directory}/lib</outputDirectory>
-                            <overWriteReleases>false</overWriteReleases>
-                            <overWriteSnapshots>false</overWriteSnapshots>
-                            <overWriteIfNewer>true</overWriteIfNewer>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2014 - 2015 Apache Software Foundation
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+#L%
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <name>samoa-api</name>
+    <description>API and algorithms for SAMOA</description>
+
+    <artifactId>samoa-api</artifactId>
+    <parent>
+        <groupId>org.apache.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yammer.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>${metrics-core.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>net.jcip</groupId>
+            <artifactId>jcip-annotations</artifactId>
+            <version>${jcip-annotations.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.javacliparser</groupId>
+            <artifactId>javacliparser</artifactId>
+            <version>${javacliparser.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.samoa</groupId>
+            <artifactId>samoa-instances</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>${kryo.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.dreizak</groupId>
+            <artifactId>miniball</artifactId>
+            <version>${miniball.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.2.0</version>
+        </dependency>
+            <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.10.2.0</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+        <dependency>
+    <groupId>org.apache.kafka</groupId>
+    <artifactId>kafka_2.11</artifactId>
+    <version>0.10.2.0</version>
+</dependency>
+        <dependency>
+    <groupId>org.apache.kafka</groupId>
+    <artifactId>kafka_2.11</artifactId>
+    <version>0.10.2.0</version>
+    <classifier>test</classifier>
+    <scope>test</scope>
+</dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>${maven-dependency-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
index b85ec1f..7b11cbd 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
@@ -13,7 +13,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.samoa.streams.kafka;
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
 
 import org.apache.samoa.core.ContentEvent;
 

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
index 5632b6e..67dfbaa 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
@@ -13,7 +13,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.samoa.streams.kafka;
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
 
 import java.util.Properties;
 import java.util.logging.Level;

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
index d0a4c0d..2b0b808 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -15,6 +15,27 @@
  */
 package org.apache.samoa.streams.kafka;
 
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -97,4 +118,11 @@ public class KafkaEntranceProcessor implements 
EntranceProcessor {
         KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor;
         return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), 
kep.deserializer, kep.topic);
     }
+
+    @Override
+    protected void finalize() throws Throwable {
+        kafkaUtils.closeConsumer();
+        super.finalize(); //To change body of generated methods, choose Tools 
| Templates.
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
index 6ede447..1996b40 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
@@ -15,12 +15,40 @@
  */
 package org.apache.samoa.streams.kafka;
 
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.InstanceCreator;
+import java.lang.reflect.Type;
 import java.nio.charset.Charset;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.instances.InstanceData;
+import org.apache.samoa.instances.SingleClassInstanceData;
 import org.apache.samoa.learners.InstanceContentEvent;
 
 /**
- * Sample class for serializing and deserializing InsatnceContentEvent from/to 
JSON format
+ * Sample class for serializing and deserializing {@link InstanceContentEvent} 
from/to JSON format
  * @author pwawrzyniak
  * @version 0.5.0-incubating-SNAPSHOT
  * @since 0.5.0-incubating
@@ -35,7 +63,7 @@ public class KafkaJsonMapper implements 
KafkaDeserializer<InstanceContentEvent>,
      * @param charset Charset to be used for bytes parsing
      */
     public KafkaJsonMapper(Charset charset){
-        this.gson = new Gson();
+        this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, 
new InstanceDataCreator()).create();        
         this.charset = charset;
     }
     
@@ -49,4 +77,13 @@ public class KafkaJsonMapper implements 
KafkaDeserializer<InstanceContentEvent>,
         return gson.toJson(message).getBytes(this.charset);
     }
     
+    public class InstanceDataCreator implements InstanceCreator<InstanceData>{
+
+        @Override
+        public InstanceData createInstance(Type type) {            
+            return new SingleClassInstanceData();
+        }
+        
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
index a8cc0b8..ad6bd8e 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
@@ -13,7 +13,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.samoa.streams.kafka;
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
 
 import org.apache.samoa.core.ContentEvent;
 

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
index 24783d4..f5227d3 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -15,19 +15,45 @@
  */
 package org.apache.samoa.streams.kafka;
 
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
- * Internal class responsible for Kafka Stream handling (both consume and 
produce)
+ * Internal class responsible for Kafka Stream handling (both consume and
+ * produce)
  *
  * @author pwawrzyniak
  * @version 0.5.0-incubating-SNAPSHOT
@@ -36,24 +62,25 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 class KafkaUtils {
 
     // Consumer class for internal use to retrieve messages from Kafka
-    private KafkaConsumer<String, byte[]> consumer;
+    private transient KafkaConsumer<String, byte[]> consumer;
 
-    private KafkaProducer<String, byte[]> producer;
+    private transient KafkaProducer<String, byte[]> producer;
 
     // Properties of the consumer, as defined in Kafka documentation
     private final Properties consumerProperties;
     private final Properties producerProperties;
 
     // Timeout for Kafka Consumer    
-    private int consumerTimeout;
+    private long consumerTimeout;
 
     /**
      * Class constructor
+     *
      * @param consumerProperties Properties of consumer
      * @param producerProperties Properties of producer
      * @param consumerTimeout Timeout for consumer poll requests
      */
-    public KafkaUtils(Properties consumerProperties, Properties 
producerProperties, int consumerTimeout) {
+    public KafkaUtils(Properties consumerProperties, Properties 
producerProperties, long consumerTimeout) {
         this.consumerProperties = consumerProperties;
         this.producerProperties = producerProperties;
         this.consumerTimeout = consumerTimeout;
@@ -66,7 +93,9 @@ class KafkaUtils {
     }
 
     /**
-     * Method used to initialize Kafka Consumer, i.e. instantiate it and 
subscribe to configured topic
+     * Method used to initialize Kafka Consumer, i.e. instantiate it and
+     * subscribe to configured topic
+     *
      * @param topics List of Kafka topics that consumer should subscribe to
      */
     public void initializeConsumer(Collection<String> topics) {
@@ -75,19 +104,29 @@ class KafkaUtils {
             consumer = new KafkaConsumer<>(consumerProperties);
         }
         consumer.subscribe(topics);
+//        consumer.seekToBeginning(consumer.assignment());
+    }
+
+    public void closeConsumer() {
+        if (consumer != null) {
+            consumer.unsubscribe();
+            consumer.close();
+        }
     }
 
-    public void initializeProducer(){
+    public void initializeProducer() {
         // lazy instantiation
-        if(producer==null){
+        if (producer == null) {
             producer = new KafkaProducer<>(producerProperties);
-        }        
+        }
     }
-    
+
     /**
      * Method for reading new messages from Kafka topics
+     *
      * @return Collection of read messages
-     * @throws Exception Exception is thrown when consumer was not initialized 
or is not subscribed to any topic.
+     * @throws Exception Exception is thrown when consumer was not initialized
+     * or is not subscribed to any topic.
      */
     public List<byte[]> getKafkaMessages() throws Exception {
 
@@ -107,16 +146,24 @@ class KafkaUtils {
     private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> 
poll) {
         Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator();
         List<byte[]> ret = new ArrayList<>();
-        while(iterator.hasNext()){
+        while (iterator.hasNext()) {
             ret.add(iterator.next().value());
         }
         return ret;
     }
-    
-    public void sendKafkaMessage(String topic, byte[] message){
-        if(producer!=null){
-            producer.send(new ProducerRecord<String, byte[]>(topic, message));
+
+    public long sendKafkaMessage(String topic, byte[] message) {
+        if (producer != null) {
+            try{
+            ProducerRecord<String, byte[]> record = new ProducerRecord(topic, 
message);
+            long offset = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
             producer.flush();
+            return offset;
+            } catch(InterruptedException | ExecutionException | 
TimeoutException e){
+                Logger.getLogger(KafkaUtils.class.getName()).log(Level.SEVERE, 
null, e);
+            }
+            
         }
+        return -1;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
new file mode 100644
index 0000000..2a92a31
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import mockit.Mocked;
+import mockit.Tested;
+import mockit.Expectations;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.moa.core.FastVector;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.streams.InstanceStream;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaEntranceProcessorTest {
+
+//    @Tested
+//    private KafkaEntranceProcessor kep;
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC = "test";
+    private static final int NUM_INSTANCES = 500;
+    
+    
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+    
+
+    public KafkaEntranceProcessorTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // create topic
+        AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+
+    @Before
+    public void setUp() throws IOException {
+
+    }
+
+    @After
+    public void tearDown() {
+
+    }
+
+    @Test
+    public void testFetchingNewData() throws InterruptedException, 
ExecutionException, TimeoutException {
+
+        Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
+        Properties props = TestUtilsForKafka.getConsumerProperties();
+        props.setProperty("auto.offset.reset", "earliest");
+        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC, 
10000, new KafkaJsonMapper(Charset.defaultCharset()));
+        kep.onCreate(1);
+
+//         prepare new thread for data producing
+        Thread th = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties());
+
+                Random r = new Random();
+                InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+                Gson gson = new Gson();
+                int i = 0;
+                for (i = 0; i < NUM_INSTANCES; i++) {
+                    try {
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
+                        long stat = producer.send(record).get(10, 
TimeUnit.DAYS).offset();
+                        Thread.sleep(5);
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
+                    } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
+                    }
+                }
+                producer.flush();
+                producer.close();
+            }
+        });
+        th.start();
+
+        int z = 0;
+        while (kep.hasNext() && z < NUM_INSTANCES) {
+            logger.log(Level.INFO, "{0} {1}", new Object[]{z++, 
kep.nextEvent().toString()});
+        }       
+
+        assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);        
+      
+     
+    }
+
+//    private Properties getProducerProperties() {
+//        Properties producerProps = new Properties();
+////                props.setProperty("zookeeper.connect", zkConnect);
+//        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
+//        producerProps.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+//        producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+////        producerProps.setProperty("group.id", "test");
+//        return producerProps;
+//    }
+//
+//    private Properties getConsumerProperties() {
+//        Properties consumerProps = new Properties();
+//        consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
+//        consumerProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+//        consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+////        consumerProps.setProperty("group.id", "test");
+//        consumerProps.setProperty("group.id", "group0");
+//        consumerProps.setProperty("client.id", "consumer0");
+//        return consumerProps;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
new file mode 100644
index 0000000..4cd5135
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.instances.InstancesHeader;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaUtilsTest {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC_R = "test-r";
+    private static final String TOPIC_S = "test-s";
+
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+
+    private Logger logger = 
Logger.getLogger(KafkaUtilsTest.class.getCanonicalName());
+    private long CONSUMER_TIMEOUT = 1000;
+
+    public KafkaUtilsTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // create topics
+        AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+
+    @Before
+    public void setUp() {
+    }
+
+    @After
+    public void tearDown() {
+    }
+
+    /**
+     * Test of initializeConsumer method, of class KafkaUtils.
+     */
+    @Test
+    public void testInitializeConsumer() throws Exception {
+        logger.log(Level.INFO, "initializeConsumer");
+        Collection<String> topics = Arrays.asList(TOPIC_R);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(), 
TestUtilsForKafka.getProducerProperties(), CONSUMER_TIMEOUT);
+        assertNotNull(instance);
+
+        instance.initializeConsumer(topics);
+
+        assertNotNull(instance.getKafkaMessages());
+        instance.closeConsumer();
+    }
+
+    /**
+     * Test of getKafkaMessages method, of class KafkaUtils.
+     */
+    @Test
+    public void testGetKafkaMessages() throws Exception {
+        logger.log(Level.INFO, "getKafkaMessages");
+        Collection<String> topics = Arrays.asList(TOPIC_R);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(), 
TestUtilsForKafka.getProducerProperties(), CONSUMER_TIMEOUT);
+        assertNotNull(instance);
+
+        logger.log(Level.INFO, "Initialising consumer");
+        instance.initializeConsumer(topics);
+
+        logger.log(Level.INFO, "Produce data");
+        List expResult = sendAndGetMessages(500);
+
+        logger.log(Level.INFO, "Get results from Kafka");
+        List<byte[]> result = instance.getKafkaMessages();
+
+        assertArrayEquals(expResult.toArray(), result.toArray());
+        instance.closeConsumer();
+    }
+
+    private List<byte[]> sendAndGetMessages(int maxNum) throws 
InterruptedException, ExecutionException, TimeoutException {
+        List<byte[]> ret;
+        try (KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test"))) {
+            ret = new ArrayList<>();
+            Random r = new Random();
+            InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+            Gson gson = new Gson();
+            int i = 0;
+            for (i = 0; i < maxNum; i++) {
+                ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
+                ret.add(record.value());
+                producer.send(record);
+            }   producer.flush();
+        }
+        return ret;
+    }
+
+    /**
+     * Test of sendKafkaMessage method, of class KafkaUtils.
+     */
+    @Test
+    public void testSendKafkaMessage() {
+        logger.log(Level.INFO, "sendKafkaMessage");
+
+        logger.log(Level.INFO, "Initialising producer");
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(), 
TestUtilsForKafka.getProducerProperties("rcv-test"), CONSUMER_TIMEOUT);
+        instance.initializeProducer();
+
+        logger.log(Level.INFO, "Initialising consumer");
+        KafkaConsumer<String, byte[]> consumer;
+        consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties());
+        consumer.subscribe(Arrays.asList(TOPIC_S));
+
+        logger.log(Level.INFO, "Produce data");
+        List<byte[]> sent = new ArrayList<>();
+        Random r = new Random();
+        InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+        Gson gson = new Gson();
+        for (int i = 0; i < 500; i++) {
+            byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes();
+            sent.add(val);
+            instance.sendKafkaMessage(TOPIC_S, val);
+        }
+
+        logger.log(Level.INFO, "Get results from Kafka");
+        ConsumerRecords<String, byte[]> records = 
consumer.poll(CONSUMER_TIMEOUT);
+        Iterator<ConsumerRecord<String, byte[]>> it = records.iterator();
+        List<byte[]> consumed = new ArrayList<>();
+        while (it.hasNext()) {
+            consumed.add(it.next().value());
+        }
+        consumer.close();
+
+        assertArrayEquals(sent.toArray(), consumed.toArray());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/43f69c62/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
new file mode 100644
index 0000000..0d30429
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.Properties;
+import java.util.Random;
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.moa.core.FastVector;
+
+/**
+ *
+ * @author pwawrzyniak <your.name at your.org>
+ */
+public class TestUtilsForKafka {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC = "test";
+
+    protected static InstanceContentEvent getData(Random instanceRandom, int 
numAtts, InstancesHeader header) {
+        double[] attVals = new double[numAtts + 1];
+        double sum = 0.0;
+        double sumWeights = 0.0;
+        for (int i = 0; i < numAtts; i++) {
+            attVals[i] = instanceRandom.nextDouble();
+//            sum += this.weights[i] * attVals[i];
+//            sumWeights += this.weights[i];
+        }
+        int classLabel;
+        if (sum >= sumWeights * 0.5) {
+            classLabel = 1;
+        } else {
+            classLabel = 0;
+        }
+
+        Instance inst = new DenseInstance(1.0, attVals);
+        inst.setDataset(header);
+        inst.setClassValue(classLabel);
+
+        return new InstanceContentEvent(0, inst, true, false);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected static InstancesHeader generateHeader(int numAttributes) {
+        FastVector attributes = new FastVector();
+        for (int i = 0; i < numAttributes; i++) {
+            attributes.addElement(new Attribute("att" + (i + 1)));
+        }
+
+        FastVector classLabels = new FastVector();
+        for (int i = 0; i < numAttributes; i++) {
+            classLabels.addElement("class" + (i + 1));
+        }
+        attributes.addElement(new Attribute("class", classLabels));
+        InstancesHeader streamHeader = new InstancesHeader(new 
Instances("test-kafka", attributes, 0));
+        streamHeader.setClassIndex(streamHeader.numAttributes() - 1);
+        return streamHeader;
+    }
+
+    
+        protected static Properties getProducerProperties() {
+        return getProducerProperties("test");
+    }
+    
+    /**
+     *
+     * @param clientId
+     * @return
+     */
+    protected static Properties getProducerProperties(String clientId) {
+        Properties producerProps = new Properties();
+        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
+        producerProps.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.setProperty("group.id", "test");
+        producerProps.setProperty("client.id", clientId);
+        return producerProps;
+    }
+
+    protected static Properties getConsumerProperties() {
+        Properties consumerProps = new Properties();
+        consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
+        consumerProps.put("enable.auto.commit", "true");
+        consumerProps.put("auto.commit.interval.ms", "1000");
+        consumerProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+        consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.setProperty("group.id", "test");
+        consumerProps.setProperty("auto.offset.reset", "earliest");
+//        consumerProps.setProperty("client.id", "consumer0");
+        return consumerProps;
+    }
+}

Reply via email to