Rebuild of Consumer-related classes (added separate thread to read from Kafka, 
blocking until sth received)
Major cleanup in tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/81d29599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/81d29599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/81d29599

Branch: refs/heads/master
Commit: 81d295995722161b8b86e7c6b4f9298cd5665d8a
Parents: be32e18
Author: pwawrzyniak <[email protected]>
Authored: Fri May 12 17:17:27 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../streams/kafka/KafkaConsumerThread.java      | 157 +++++++++++++++++++
 .../streams/kafka/KafkaEntranceProcessor.java   |   2 +-
 .../apache/samoa/streams/kafka/KafkaUtils.java  |  47 ++----
 .../kafka/KafkaDestinationProcessorTest.java    |  47 ++++--
 .../kafka/KafkaEntranceProcessorTest.java       | 151 +++++++-----------
 .../samoa/streams/kafka/KafkaTaskTest.java      |  56 +++++--
 .../samoa/streams/kafka/KafkaUtilsTest.java     |  64 ++++----
 .../samoa/streams/kafka/TestUtilsForKafka.java  |  22 ++-
 8 files changed, 349 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
new file mode 100644
index 0000000..6522f67
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+/**
+ *
+ * @author pwawrzyniak <your.name at your.org>
+ */
+class KafkaConsumerThread extends Thread {
+
+    // Consumer class for internal use to retrieve messages from Kafka
+    private transient KafkaConsumer<String, byte[]> consumer;
+
+    private Logger log = Logger.getLogger(KafkaConsumerThread.class.getName());
+
+    private final Properties consumerProperties;
+    private final Collection<String> topics;
+    private final long consumerTimeout;
+    private final List<byte[]> buffer;
+    // used to synchronize things
+    private final Object lock;
+    private boolean running;
+
+    /**
+     * Class constructor
+     *
+     * @param consumerProperties Properties of Consumer
+     * @param topics Topics to fetch (subscribe)
+     * @param consumerTimeout Timeout for data polling
+     */
+    KafkaConsumerThread(Properties consumerProperties, Collection<String> 
topics, long consumerTimeout) {
+        this.running = false;
+        this.consumerProperties = consumerProperties;
+        this.topics = topics;
+        this.consumerTimeout = consumerTimeout;
+        this.buffer = new ArrayList<>();
+        lock = new Object();
+    }
+
+    @Override
+    public void run() {
+
+        initializeConsumer();
+
+        while (running) {
+            fetchDataFromKafka();
+        }
+
+        cleanUp();
+    }
+
+    /**
+     * Method for fetching data from Apache Kafka. It takes care of received
+     * data
+     */
+    private void fetchDataFromKafka() {
+        if (consumer != null) {
+            if (!consumer.subscription().isEmpty()) {
+                try {
+                    List<byte[]> kafkaMsg = 
getMessagesBytes(consumer.poll(consumerTimeout));
+                    fillBufferAndNotifyWaits(kafkaMsg);
+                } catch (Throwable t) {
+                    
Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, 
t);
+                }
+            }
+        }
+    }
+
+    /**
+     * Copies received messages to class buffer and notifies Processor to grab
+     * the data.
+     *
+     * @param kafkaMsg Messages received from Kafka
+     */
+    private void fillBufferAndNotifyWaits(List<byte[]> kafkaMsg) {
+        synchronized (lock) {
+            buffer.addAll(kafkaMsg);
+            if (buffer.size() > 0) {
+                lock.notifyAll();
+            }
+        }
+    }
+
+    private void cleanUp() {
+        // clean resources
+        if (consumer != null) {
+            consumer.unsubscribe();
+            consumer.close();
+        }
+    }
+
+    private void initializeConsumer() {
+        // lazy instantiation
+        log.log(Level.INFO, "Instantiating Kafka consumer");
+        if (consumer == null) {
+            consumer = new KafkaConsumer<>(consumerProperties);
+            running = true;
+        }
+        consumer.subscribe(topics);
+    }
+
+    private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> 
poll) {
+        Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator();
+        List<byte[]> ret = new ArrayList<>();
+        while (iterator.hasNext()) {
+            ret.add(iterator.next().value());
+        }
+        return ret;
+    }
+
+    void close() {
+        running = false;
+    }
+
+    List<byte[]> getKafkaMessages() {
+        synchronized (lock) {
+            if (buffer.isEmpty()) {
+                try {
+                    // block the call until new messages are received
+                    lock.wait();
+                } catch (InterruptedException ex) {
+                    
Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, null, 
ex);
+                }
+            }
+            ArrayList<byte[]> ret = new ArrayList<>();
+            // copy buffer to return list
+            ret.addAll(buffer);
+            // clear message buffer
+            buffer.clear();
+            return ret;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
index 2b0b808..7079c58 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -104,7 +104,7 @@ public class KafkaEntranceProcessor implements 
EntranceProcessor {
 
     @Override
     public ContentEvent nextEvent() {
-        // assume this will never be called when buffer is empty!
+        // assume this will never be called when buffer is empty!        
         return this.deserializer.deserialize(buffer.remove(buffer.size() - 1));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
index 0635877..75b5402 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -61,8 +61,7 @@ import java.util.logging.Logger;
  */
 class KafkaUtils {
 
-    // Consumer class for internal use to retrieve messages from Kafka
-    private transient KafkaConsumer<String, byte[]> consumer;
+    private transient KafkaConsumerThread kafkaConsumerThread;
 
     private transient KafkaProducer<String, byte[]> producer;
 
@@ -72,6 +71,7 @@ class KafkaUtils {
 
     // Timeout for Kafka Consumer    
     private long consumerTimeout;
+        
 
     /**
      * Class constructor
@@ -86,6 +86,10 @@ class KafkaUtils {
         this.consumerTimeout = consumerTimeout;
     }
 
+    /**
+     * Creates new KafkaUtils from existing instance
+     * @param kafkaUtils Instance of KafkaUtils
+     */
     KafkaUtils(KafkaUtils kafkaUtils) {
         this.consumerProperties = kafkaUtils.consumerProperties;
         this.producerProperties = kafkaUtils.producerProperties;
@@ -93,25 +97,18 @@ class KafkaUtils {
     }
 
     /**
-     * Method used to initialize Kafka Consumer, i.e. instantiate it and
+     * Method used to initialize Kafka Consumer Thread, i.e. instantiate it and
      * subscribe to configured topic
      *
      * @param topics List of Kafka topics that consumer should subscribe to
      */
-    public void initializeConsumer(Collection<String> topics) {
-        // lazy instantiation
-        if (consumer == null) {
-            consumer = new KafkaConsumer<>(consumerProperties);
-        }
-        consumer.subscribe(topics);
-//        consumer.seekToBeginning(consumer.assignment());
+    public void initializeConsumer(Collection<String> topics) {        
+        kafkaConsumerThread = new KafkaConsumerThread(consumerProperties, 
topics, consumerTimeout);
+        kafkaConsumerThread.start();        
     }
 
     public void closeConsumer() {
-        if (consumer != null) {
-            consumer.unsubscribe();
-            consumer.close();
-        }
+        kafkaConsumerThread.close();
     }
 
     public void initializeProducer() {
@@ -135,27 +132,7 @@ class KafkaUtils {
      * or is not subscribed to any topic.
      */
     public List<byte[]> getKafkaMessages() throws Exception {
-
-        if (consumer != null) {
-            if (!consumer.subscription().isEmpty()) {
-                return getMessagesBytes(consumer.poll(consumerTimeout));
-            } else {
-                // TODO: do it more elegant way
-                throw new Exception("Consumer subscribed to no topics!");
-            }
-        } else {
-            // TODO: do more elegant way
-            throw new Exception("Consumer not initialised");
-        }
-    }
-
-    private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> 
poll) {
-        Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator();
-        List<byte[]> ret = new ArrayList<>();
-        while (iterator.hasNext()) {
-            ret.add(iterator.next().value());
-        }
-        return ret;
+        return kafkaConsumerThread.getKafkaMessages();
     }
 
     public long sendKafkaMessage(String topic, byte[] message) {

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
index a138763..bf45ffb 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
@@ -15,6 +15,25 @@
  */
 package org.apache.samoa.streams.kafka;
 
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
@@ -51,7 +70,7 @@ import org.junit.Test;
 
 /**
  *
- * @author pwawrzyniak <your.name at your.org>
+ * @author pwawrzyniak
  */
 public class KafkaDestinationProcessorTest {
 
@@ -59,7 +78,7 @@ public class KafkaDestinationProcessorTest {
     private static final String BROKERHOST = "127.0.0.1";
     private static final String BROKERPORT = "9092";
     private static final String TOPIC = "test-kdp";
-    private static final int NUM_INSTANCES = 500;
+    private static final int NUM_INSTANCES = 11111;
     private static final int CONSUMER_TIMEOUT = 1000;
 
     private static KafkaServer kafkaServer;
@@ -94,7 +113,7 @@ public class KafkaDestinationProcessorTest {
     }
 
     @AfterClass
-    public static void tearDownClass() {
+    public static void tearDownClass() {        
         kafkaServer.shutdown();
         zkClient.close();
         zkServer.shutdown();
@@ -114,34 +133,28 @@ public class KafkaDestinationProcessorTest {
     public void testSendingData() throws InterruptedException, 
ExecutionException, TimeoutException {
 
         final Logger logger = 
Logger.getLogger(KafkaDestinationProcessorTest.class.getName());
-        Properties props = TestUtilsForKafka.getProducerProperties();
+        Properties props = 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT);
         props.setProperty("auto.offset.reset", "earliest");
         KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, 
TOPIC, new KafkaJsonMapper(Charset.defaultCharset()));
         kdp.onCreate(1);
 
         final int[] i = {0};
-        
-//         prepare new thread for data receiveing
+
+        // prepare new thread for data receiveing
         Thread th = new Thread(new Runnable() {
             @Override
             public void run() {
-                KafkaConsumer<String, byte[]> consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties());
+                KafkaConsumer<String, byte[]> consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT));
                 consumer.subscribe(Arrays.asList(TOPIC));
                 while (i[0] < NUM_INSTANCES) {
                     try {
                         ConsumerRecords<String, byte[]> cr = 
consumer.poll(CONSUMER_TIMEOUT);
-
                         Iterator<ConsumerRecord<String, byte[]>> it = 
cr.iterator();
                         while (it.hasNext()) {
                             ConsumerRecord<String, byte[]> record = it.next();
-                            logger.info(new String(record.value()));
-                            logger.log(Level.INFO, "Current read offset is: 
{0}", record.offset());
                             i[0]++;
                         }
-                        
-                        Thread.sleep(1);
-
-                    } catch (InterruptedException ex) {
+                    } catch (Exception ex) {
                         
Logger.getLogger(KafkaDestinationProcessorTest.class.getName()).log(Level.SEVERE,
 null, ex);
                     }
                 }
@@ -157,11 +170,11 @@ public class KafkaDestinationProcessorTest {
         for (z = 0; z < NUM_INSTANCES; z++) {
             InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, 
header);
             kdp.process(event);
-            logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with id: 
", z});
-            Thread.sleep(5);
+//            logger.log(Level.INFO, "{0} {1}", new Object[]{"Sent item with 
id: ", z});
         }
+
         // wait for all instances to be read
-        Thread.sleep(100);        
+        Thread.sleep(2 * CONSUMER_TIMEOUT);
         assertEquals("Number of sent and received instances", z, i[0]);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
index bc2a11e..009a6a7 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -38,7 +38,6 @@ import com.google.gson.Gson;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
-import java.util.ArrayList;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
@@ -46,17 +45,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import mockit.Mocked;
-import mockit.Tested;
-import mockit.Expectations;
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.core.Processor;
 import org.apache.samoa.learners.InstanceContentEvent;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import static org.junit.Assert.*;
 import kafka.admin.AdminUtils;
@@ -72,36 +65,26 @@ import kafka.zk.EmbeddedZookeeper;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.samoa.instances.Attribute;
-import org.apache.samoa.instances.DenseInstance;
-import org.apache.samoa.instances.Instance;
-import org.apache.samoa.instances.Instances;
 import org.apache.samoa.instances.InstancesHeader;
-import org.apache.samoa.moa.core.FastVector;
-import org.apache.samoa.moa.core.InstanceExample;
-import org.apache.samoa.streams.InstanceStream;
 
 /**
  *
  * @author pwawrzyniak
  */
-//@Ignore
 public class KafkaEntranceProcessorTest {
 
-//    @Tested
-//    private KafkaEntranceProcessor kep;
-    private static final String ZKHOST = "10.255.251.202";             
//10.255.251.202
-    private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214
-    private static final String BROKERPORT = "6667";           //6667, local: 
9092
-    private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
-    private static final int NUM_INSTANCES = 50;
-    
-    
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC_AVRO = "samoa_test-avro";
+    private static final String TOPIC_JSON = "samoa_test-json";
+    private static final int NUM_INSTANCES = 11111;
+
     private static KafkaServer kafkaServer;
     private static EmbeddedZookeeper zkServer;
     private static ZkClient zkClient;
     private static String zkConnect;
-    
+    private static int TIMEOUT = 1000;
 
     public KafkaEntranceProcessorTest() {
     }
@@ -110,30 +93,35 @@ public class KafkaEntranceProcessorTest {
     public static void setUpClass() throws IOException {
         // setup Zookeeper
         zkServer = new EmbeddedZookeeper();
-        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
+        zkConnect = ZKHOST + ":" + zkServer.port();
         zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
         ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
 
         // setup Broker
-        /*Properties brokerProps = new Properties();
+        Properties brokerProps = new Properties();
         brokerProps.setProperty("zookeeper.connect", zkConnect);
         brokerProps.setProperty("broker.id", "0");
         brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
         brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
         KafkaConfig config = new KafkaConfig(brokerProps);
         Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);*/
+        kafkaServer = TestUtils.createServer(config, mock);
 
-        // create topic
-        //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        // create topics
+        AdminUtils.createTopic(zkUtils, TOPIC_AVRO, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        AdminUtils.createTopic(zkUtils, TOPIC_JSON, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
 
     }
 
     @AfterClass
     public static void tearDownClass() {
-        //kafkaServer.shutdown();
-        zkClient.close();
-        zkServer.shutdown();
+        try {
+            kafkaServer.shutdown();
+            zkClient.close();
+            zkServer.shutdown();
+        } catch (Exception ex) {
+            
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
+        }
     }
 
     @Before
@@ -146,20 +134,23 @@ public class KafkaEntranceProcessorTest {
 
     }
 
-    /*@Test
-    public void testFetchingNewData() throws InterruptedException, 
ExecutionException, TimeoutException {
+    @Test
+    public void testFetchingNewDataWithJson() throws InterruptedException, 
ExecutionException, TimeoutException {
 
         Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
-        Properties props = TestUtilsForKafka.getConsumerProperties();
+        logger.log(Level.INFO, "JSON");
+        logger.log(Level.INFO, "testFetchingNewDataWithJson");
+        Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT);
         props.setProperty("auto.offset.reset", "earliest");
-        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC, 
10000, new KafkaJsonMapper(Charset.defaultCharset()));
+        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_JSON, TIMEOUT, new KafkaJsonMapper(Charset.defaultCharset()));
+
         kep.onCreate(1);
 
-//         prepare new thread for data producing
+        // prepare new thread for data producing
         Thread th = new Thread(new Runnable() {
             @Override
             public void run() {
-                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties());
+                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT));
 
                 Random r = new Random();
                 InstancesHeader header = TestUtilsForKafka.generateHeader(10);
@@ -167,10 +158,9 @@ public class KafkaEntranceProcessorTest {
                 int i = 0;
                 for (i = 0; i < NUM_INSTANCES; i++) {
                     try {
-                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
-                        long stat = producer.send(record).get(10, 
TimeUnit.DAYS).offset();
-                        Thread.sleep(5);
-                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
+                        InstanceContentEvent event = 
TestUtilsForKafka.getData(r, 10, header);
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_JSON, gson.toJson(event).getBytes());
+                        long stat = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
                     } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
                         
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
                     }
@@ -182,45 +172,44 @@ public class KafkaEntranceProcessorTest {
         th.start();
 
         int z = 0;
-        while (kep.hasNext() && z < NUM_INSTANCES) {
-            logger.log(Level.INFO, "{0} {1}", new Object[]{z++, 
kep.nextEvent().toString()});
-        }       
+        while (z < NUM_INSTANCES && kep.hasNext()) {
+            InstanceContentEvent event = (InstanceContentEvent) 
kep.nextEvent();
+            z++;
+//            logger.log(Level.INFO, "{0} {1}", new Object[]{z, 
event.getInstance().toString()});
+        }
+
+        assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);
+
+    }
 
-        assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);        
-      
-     
-    }*/
-    
     @Test
     public void testFetchingNewDataWithAvro() throws InterruptedException, 
ExecutionException, TimeoutException {
         Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
         logger.log(Level.INFO, "AVRO");
-       logger.log(Level.INFO, "testFetchingNewDataWithAvro");
-        Properties props = TestUtilsForKafka.getConsumerProperties();
+        logger.log(Level.INFO, "testFetchingNewDataWithAvro");
+        Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT);
         props.setProperty("auto.offset.reset", "earliest");
-        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC, 
10000, new KafkaAvroMapper());
+        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_AVRO, TIMEOUT, new KafkaAvroMapper());
         kep.onCreate(1);
 
 //         prepare new thread for data producing
         Thread th = new Thread(new Runnable() {
             @Override
             public void run() {
-                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties());
+                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT));
 
                 Random r = new Random();
                 InstancesHeader header = TestUtilsForKafka.generateHeader(10);
-                KafkaAvroMapper avroMapper = new KafkaAvroMapper();
+
                 int i = 0;
                 for (i = 0; i < NUM_INSTANCES; i++) {
                     try {
-                       //byte[] data = 
avroMapper.serialize(TestUtilsForKafka.getData(r, 10, header));
-                       byte[] data = 
KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, 
TestUtilsForKafka.getData(r, 10, header));
-                       if(data == null)
-                               
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Serialize result: null ("+i+")");
-                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, data);
-                        long stat = producer.send(record).get(10, 
TimeUnit.DAYS).offset();
-                        Thread.sleep(5);
-                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Sent avro message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
+                        byte[] data = 
KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, 
TestUtilsForKafka.getData(r, 10, header));
+                        if (data == null) {
+                            
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Serialize result: null ({0})", i);
+                        }
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_AVRO, data);
+                        long stat = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
                     } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
                         
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
                     }
@@ -232,34 +221,12 @@ public class KafkaEntranceProcessorTest {
         th.start();
 
         int z = 0;
-        while (kep.hasNext() && z < NUM_INSTANCES) {
-               InstanceContentEvent event = 
(InstanceContentEvent)kep.nextEvent();
-            logger.log(Level.INFO, "{0} {1}", new Object[]{z++, 
event.getInstance().toString()});
-        }       
+        while (z < NUM_INSTANCES && kep.hasNext()) {
+            InstanceContentEvent event = (InstanceContentEvent) 
kep.nextEvent();
+            z++;
+//            logger.log(Level.INFO, "{0} {1}", new Object[]{z, 
event.getInstance().toString()});
+        }
 
-        assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);        
-      
-     
+        assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);
     }
-
-//    private Properties getProducerProperties() {
-//        Properties producerProps = new Properties();
-////                props.setProperty("zookeeper.connect", zkConnect);
-//        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
-//        producerProps.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-//        producerProps.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-////        producerProps.setProperty("group.id", "test");
-//        return producerProps;
-//    }
-//
-//    private Properties getConsumerProperties() {
-//        Properties consumerProps = new Properties();
-//        consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
-//        consumerProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
-//        consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-////        consumerProps.setProperty("group.id", "test");
-//        consumerProps.setProperty("group.id", "group0");
-//        consumerProps.setProperty("client.id", "consumer0");
-//        return consumerProps;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
index 31f34fb..08aae11 100644
--- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
@@ -15,11 +15,14 @@
  */
 package org.apache.samoa.streams.kafka;
 
+import com.google.gson.Gson;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -44,6 +47,9 @@ import kafka.utils.TestUtils;
 import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
 import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samoa.instances.InstancesHeader;
 
 /*
  * #%L
@@ -72,11 +78,11 @@ import kafka.zk.EmbeddedZookeeper;
 @Ignore
 public class KafkaTaskTest {
        
-    private static final String ZKHOST = "10.255.251.202";             
//10.255.251.202
-    private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214
-    private static final String BROKERPORT = "6667";           //6667, local: 
9092
+    private static final String ZKHOST = "127.0.0.1";//10.255.251.202";        
        //10.255.251.202
+    private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214";   
//10.255.251.214
+    private static final String BROKERPORT = "9092";           //6667, local: 
9092
     private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
-    private static final int NUM_INSTANCES = 500;
+    private static final int NUM_INSTANCES = 125922;
     
     
     private static KafkaServer kafkaServer;
@@ -87,10 +93,10 @@ public class KafkaTaskTest {
     @BeforeClass
     public static void setUpClass() throws IOException {
         // setup Zookeeper
-        zkServer = new EmbeddedZookeeper();
-        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
-        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
-        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+//        zkServer = new EmbeddedZookeeper();
+//        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
+//        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+//        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
 
         // setup Broker
         /*Properties brokerProps = new Properties();
@@ -109,8 +115,8 @@ public class KafkaTaskTest {
     @AfterClass
     public static void tearDownClass() {
         //kafkaServer.shutdown(); 
-        zkClient.close();
-        zkServer.shutdown();
+//        zkClient.close();
+//        zkServer.shutdown();
     }
 
     @Before
@@ -127,12 +133,38 @@ public class KafkaTaskTest {
     public void testKafkaTask() throws InterruptedException, 
ExecutionException, TimeoutException {
         Logger logger = Logger.getLogger(KafkaTaskTest.class.getName());
         logger.log(Level.INFO, "KafkaTask");
-        Properties producerProps = TestUtilsForKafka.getProducerProperties();
-        Properties consumerProps = TestUtilsForKafka.getConsumerProperties();
+        Properties producerProps = 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT);
+        Properties consumerProps = 
TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT);
             
         KafkaTask task = new KafkaTask(producerProps, consumerProps, 
"kafkaTaskTest", 10000, new KafkaJsonMapper(Charset.defaultCharset()), new 
KafkaJsonMapper(Charset.defaultCharset()));
         task.setFactory(new SimpleComponentFactory());
         task.init();
         SimpleEngine.submitTopology(task.getTopology());
+        
+                Thread th = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT));
+
+                Random r = new Random();
+                InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+                Gson gson = new Gson();
+                int i = 0;
+                for (i = 0; i < NUM_INSTANCES; i++) {
+                    try {
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
+                        long stat = producer.send(record).get(10, 
TimeUnit.DAYS).offset();
+//                        Thread.sleep(5);
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
+                    } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
+                    }
+                }
+                producer.flush();
+                producer.close();
+            }
+        });
+        th.start();
+        
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
index 7c1c7c0..e2b36fd 100644
--- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
@@ -34,8 +34,6 @@ package org.apache.samoa.streams.kafka;
  * limitations under the License.
  * #L%
  */
-
-
 import com.google.gson.Gson;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -47,7 +45,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -70,32 +67,31 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.samoa.instances.InstancesHeader;
 import org.junit.After;
 import org.junit.AfterClass;
+import static org.junit.Assert.*;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 /**
  *
  * @author pwawrzyniak
  */
-@Ignore
 public class KafkaUtilsTest {
 
-    private static final String ZKHOST = "10.255.251.202";             
//10.255.251.202
-    private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214
-    private static final String BROKERPORT = "6667";           //6667, local: 
9092
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
     private static final String TOPIC_R = "test-r";
     private static final String TOPIC_S = "test-s";
+    private static final int NUM_INSTANCES = 50;
 
     private static KafkaServer kafkaServer;
     private static EmbeddedZookeeper zkServer;
     private static ZkClient zkClient;
     private static String zkConnect;
 
-    private Logger logger = 
Logger.getLogger(KafkaUtilsTest.class.getCanonicalName());
-    private long CONSUMER_TIMEOUT = 1000;
+    private static final Logger logger = 
Logger.getLogger(KafkaUtilsTest.class.getCanonicalName());
+    private final long CONSUMER_TIMEOUT = 1000;
 
     public KafkaUtilsTest() {
     }
@@ -104,29 +100,29 @@ public class KafkaUtilsTest {
     public static void setUpClass() throws IOException {
         // setup Zookeeper
         zkServer = new EmbeddedZookeeper();
-        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
+        zkConnect = ZKHOST + ":" + zkServer.port();
         zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
         ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
 
         // setup Broker
-        /*Properties brokerProps = new Properties();
+        Properties brokerProps = new Properties();
         brokerProps.setProperty("zookeeper.connect", zkConnect);
         brokerProps.setProperty("broker.id", "0");
         brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString());
         brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
         KafkaConfig config = new KafkaConfig(brokerProps);
         Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);*/
+        kafkaServer = TestUtils.createServer(config, mock);
 
         // create topics
-        //AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
-        //AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
 
     }
 
     @AfterClass
     public static void tearDownClass() {
-        //kafkaServer.shutdown();
+        kafkaServer.shutdown();
         zkClient.close();
         zkServer.shutdown();
     }
@@ -146,13 +142,19 @@ public class KafkaUtilsTest {
     public void testInitializeConsumer() throws Exception {
         logger.log(Level.INFO, "initializeConsumer");
         Collection<String> topics = Arrays.asList(TOPIC_R);
-        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(), 
TestUtilsForKafka.getProducerProperties(), CONSUMER_TIMEOUT);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), 
CONSUMER_TIMEOUT);
         assertNotNull(instance);
 
         instance.initializeConsumer(topics);
+        Thread.sleep(1000);
+        instance.closeConsumer();
+
+        Thread.sleep(CONSUMER_TIMEOUT);
 
-        assertNotNull(instance.getKafkaMessages());
+        instance.initializeConsumer(topics);
+        Thread.sleep(1000);
         instance.closeConsumer();
+        assertTrue(true);
     }
 
     /**
@@ -162,14 +164,17 @@ public class KafkaUtilsTest {
     public void testGetKafkaMessages() throws Exception {
         logger.log(Level.INFO, "getKafkaMessages");
         Collection<String> topics = Arrays.asList(TOPIC_R);
-        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(), 
TestUtilsForKafka.getProducerProperties(), CONSUMER_TIMEOUT);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), 
CONSUMER_TIMEOUT);
         assertNotNull(instance);
 
         logger.log(Level.INFO, "Initialising consumer");
         instance.initializeConsumer(topics);
 
         logger.log(Level.INFO, "Produce data");
-        List expResult = sendAndGetMessages(50);
+        List expResult = sendAndGetMessages(NUM_INSTANCES);
+
+        logger.log(Level.INFO, "Wait a moment");
+        Thread.sleep(CONSUMER_TIMEOUT);
 
         logger.log(Level.INFO, "Get results from Kafka");
         List<byte[]> result = instance.getKafkaMessages();
@@ -180,7 +185,7 @@ public class KafkaUtilsTest {
 
     private List<byte[]> sendAndGetMessages(int maxNum) throws 
InterruptedException, ExecutionException, TimeoutException {
         List<byte[]> ret;
-        try (KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test"))) {
+        try (KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test",BROKERHOST,BROKERPORT)))
 {
             ret = new ArrayList<>();
             Random r = new Random();
             InstancesHeader header = TestUtilsForKafka.generateHeader(10);
@@ -190,25 +195,28 @@ public class KafkaUtilsTest {
                 ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
                 ret.add(record.value());
                 producer.send(record);
-            }   producer.flush();
+            }
+            producer.flush();
         }
         return ret;
     }
 
     /**
      * Test of sendKafkaMessage method, of class KafkaUtils.
+     *
+     * @throws java.lang.InterruptedException
      */
     @Test
-    public void testSendKafkaMessage() {
+    public void testSendKafkaMessage() throws InterruptedException {
         logger.log(Level.INFO, "sendKafkaMessage");
 
         logger.log(Level.INFO, "Initialising producer");
-        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(), 
TestUtilsForKafka.getProducerProperties("rcv-test"), CONSUMER_TIMEOUT);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), 
TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST,BROKERPORT), 
CONSUMER_TIMEOUT);
         instance.initializeProducer();
 
         logger.log(Level.INFO, "Initialising consumer");
         KafkaConsumer<String, byte[]> consumer;
-        consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties());
+        consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT));
         consumer.subscribe(Arrays.asList(TOPIC_S));
 
         logger.log(Level.INFO, "Produce data");
@@ -216,11 +224,13 @@ public class KafkaUtilsTest {
         Random r = new Random();
         InstancesHeader header = TestUtilsForKafka.generateHeader(10);
         Gson gson = new Gson();
-        for (int i = 0; i < 50; i++) {
+        for (int i = 0; i < NUM_INSTANCES; i++) {
             byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes();
             sent.add(val);
             instance.sendKafkaMessage(TOPIC_S, val);
         }
+        // wait for Kafka a bit :)
+        Thread.sleep(CONSUMER_TIMEOUT);
 
         logger.log(Level.INFO, "Get results from Kafka");
         ConsumerRecords<String, byte[]> records = 
consumer.poll(CONSUMER_TIMEOUT);

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/81d29599/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
index 8d85fd7..87ab16c 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
@@ -48,14 +48,12 @@ import org.apache.samoa.moa.core.FastVector;
 
 /**
  *
- * @author pwawrzyniak <your.name at your.org>
+ * @author pwawrzyniak
  */
 public class TestUtilsForKafka {
 
-    private static final String ZKHOST = "10.255.251.202";             
//10.255.251.202
-    private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214
-    private static final String BROKERPORT = "6667";           //6667, local: 
9092
-    private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
+//    private static final String BROKERHOST = "127.0.0.1";
+//    private static final String BROKERPORT = "9092";                         
 
     protected static InstanceContentEvent getData(Random instanceRandom, int 
numAtts, InstancesHeader header) {
         double[] attVals = new double[numAtts + 1];
@@ -63,8 +61,7 @@ public class TestUtilsForKafka {
         double sumWeights = 0.0;
         for (int i = 0; i < numAtts; i++) {
             attVals[i] = instanceRandom.nextDouble();
-//            sum += this.weights[i] * attVals[i];
-//            sumWeights += this.weights[i];
+
         }
         int classLabel;
         if (sum >= sumWeights * 0.5) {
@@ -98,8 +95,8 @@ public class TestUtilsForKafka {
     }
 
     
-        protected static Properties getProducerProperties() {
-        return getProducerProperties("test");
+        protected static Properties getProducerProperties(String BROKERHOST, 
String BROKERPORT) {
+        return getProducerProperties("test", BROKERHOST, BROKERPORT);
     }
     
     /**
@@ -107,7 +104,7 @@ public class TestUtilsForKafka {
      * @param clientId
      * @return
      */
-    protected static Properties getProducerProperties(String clientId) {
+    protected static Properties getProducerProperties(String clientId, String 
BROKERHOST, String BROKERPORT) {
         Properties producerProps = new Properties();
         producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
         producerProps.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
@@ -117,7 +114,7 @@ public class TestUtilsForKafka {
         return producerProps;
     }
 
-    protected static Properties getConsumerProperties() {
+    protected static Properties getConsumerProperties(String BROKERHOST, 
String BROKERPORT) {
         Properties consumerProps = new Properties();
         consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + 
BROKERPORT);
         consumerProps.put("enable.auto.commit", "true");
@@ -126,11 +123,10 @@ public class TestUtilsForKafka {
         consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
         consumerProps.setProperty("group.id", "test");
         consumerProps.setProperty("auto.offset.reset", "earliest");
-        //consumerProps.setProperty("client.id", "consumer0");
         return consumerProps;
     }
     
-    protected static Properties getConsumerProducerProperties() {
+    protected static Properties getConsumerProducerProperties(String 
BROKERHOST, String BROKERPORT) {
         Properties props = new Properties();
         props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
         props.put("enable.auto.commit", "true");

Reply via email to