Changes in tests (removing unused tests and classses, minor refactoring)

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/a5cda69c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/a5cda69c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/a5cda69c

Branch: refs/heads/master
Commit: a5cda69cde10eade36ea192854143ea482314e8a
Parents: 84c9487
Author: pwawrzyniak <[email protected]>
Authored: Wed Jul 5 10:28:19 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../apache/samoa/streams/kafka/KafkaTask.java   | 291 +++++------
 .../samoa/streams/kafka/KafkaTaskTest.java      | 153 ------
 .../samoa/streams/kafka/KafkaUtilsTest.java     | 490 ++++++++++---------
 .../kafka/topology/SimpleComponentFactory.java  |  53 --
 .../streams/kafka/topology/SimpleEngine.java    |  37 --
 .../topology/SimpleEntranceProcessingItem.java  |  33 --
 .../kafka/topology/SimpleProcessingItem.java    |  87 ----
 .../streams/kafka/topology/SimpleStream.java    |  95 ----
 .../streams/kafka/topology/SimpleTopology.java  |  46 --
 9 files changed, 394 insertions(+), 891 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
index 0c8f138..b3d638f 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
@@ -1,144 +1,147 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Properties;
-
-import org.apache.samoa.tasks.Task;
-import org.apache.samoa.topology.ComponentFactory;
-import org.apache.samoa.topology.Stream;
-import org.apache.samoa.topology.Topology;
-import org.apache.samoa.topology.TopologyBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.Configurable;
-import com.github.javacliparser.IntOption;
-import com.github.javacliparser.StringOption;
-
-/**
- * Kafka task
- * 
- * @author Jakub Jankowski
- * @version 0.5.0-incubating-SNAPSHOT
- * @since 0.5.0-incubating
- *
- */
-
-public class KafkaTask implements Task, Configurable {
-
-       private static final long serialVersionUID = 3984474041982397855L;
-       private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
-       
-       //czy identyczne dla enterance i destination?
-       Properties producerProps;
-       Properties consumerProps;
-       int timeout;
-       private final KafkaDeserializer deserializer;
-       private final KafkaSerializer serializer;
-       private final String topic;
-
-       private TopologyBuilder builder;
-       private Topology kafkaTopology;
-
-       public IntOption kafkaParallelismOption = new 
IntOption("parallelismOption", 'p',
-                       "Number of destination Processors", 1, 1, 
Integer.MAX_VALUE);
-
-       public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
-                       "KafkaTask" + new 
SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
-
-       /**
-     * Class constructor
-     * @param props Properties of Kafka Producer and Consumer
-     * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
configuration</a>
-     * @see <a 
href="http://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
configuration</a>
-     * @param topic Topic to which destination processor will write into
-     * @param timeout Timeout used when polling Kafka for new messages
-     * @param serializer Implementation of KafkaSerializer that handles 
arriving data serialization
-     * @param serializer Implementation of KafkaDeserializer that handles 
arriving data deserialization
-     */
-       public KafkaTask(Properties producerProps, Properties consumerProps, 
String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer 
deserializer) {
-               this.producerProps = producerProps;
-               this.consumerProps = consumerProps;
-               this.deserializer = deserializer;
-               this.serializer = serializer;
-               this.topic = topic;
-               this.timeout = timeout;
-       }
-
-       @Override
-       public void init() {
-               logger.info("Invoking init");
-               if (builder == null) {
-                       builder = new TopologyBuilder();
-                       logger.info("Successfully instantiating 
TopologyBuilder");
-
-                       builder.initTopology(evaluationNameOption.getValue());
-                       logger.info("Successfully initializing SAMOA topology 
with name {}", evaluationNameOption.getValue());
-               }
-               
-               // create enterance processor
-               KafkaEntranceProcessor sourceProcessor = new 
KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer);
-               builder.addEntranceProcessor(sourceProcessor);
-               
-               // create stream
-               Stream stream = builder.createStream(sourceProcessor);
-               
-               // create destination processor
-               KafkaDestinationProcessor destProcessor = new 
KafkaDestinationProcessor(producerProps, topic, serializer);
-               builder.addProcessor(destProcessor, 
kafkaParallelismOption.getValue());
-               builder.connectInputShuffleStream(stream, destProcessor);
-               
-               // build topology
-               kafkaTopology = builder.build();
-           logger.info("Successfully built the topology");
-       }
-
-       @Override
-       public Topology getTopology() {
-               return kafkaTopology;
-       }
-
-       @Override
-       public void setFactory(ComponentFactory factory) {
-               logger.info("Invoking setFactory: "+factory.toString());
-               builder = new TopologyBuilder(factory);
-           logger.info("Successfully instantiating TopologyBuilder");
-
-           builder.initTopology(evaluationNameOption.getValue());
-           logger.info("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
-
-       }
-
-}
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+/**
+ * Kafka task
+ * 
+ * @author Jakub Jankowski
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ *
+ */
+
+public class KafkaTask implements Task, Configurable {
+
+       private static final long serialVersionUID = 3984474041982397855L;
+       private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
+       
+       //czy identyczne dla enterance i destination?
+       Properties producerProps;
+       Properties consumerProps;
+       int timeout;
+       private final KafkaDeserializer deserializer;
+       private final KafkaSerializer serializer;       
+
+       private TopologyBuilder builder;
+       private Topology kafkaTopology;
+
+       public IntOption kafkaParallelismOption = new 
IntOption("parallelismOption", 'p',
+                       "Number of destination Processors", 1, 1, 
Integer.MAX_VALUE);
+
+       public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
+                       "KafkaTask" + new 
SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+        
+    private final String inTopic;
+    private final String outTopic;
+
+       /**
+     * Class constructor
+     * @param props Properties of Kafka Producer and Consumer
+     * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
configuration</a>
+     * @see <a 
href="http://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
configuration</a>
+     * @param topic Topic to which destination processor will write into
+     * @param timeout Timeout used when polling Kafka for new messages
+     * @param serializer Implementation of KafkaSerializer that handles 
arriving data serialization
+     * @param serializer Implementation of KafkaDeserializer that handles 
arriving data deserialization
+     */
+       public KafkaTask(Properties producerProps, Properties consumerProps, 
String inTopic, String outTopic, int timeout, KafkaSerializer serializer, 
KafkaDeserializer deserializer) {
+               this.producerProps = producerProps;
+               this.consumerProps = consumerProps;
+               this.deserializer = deserializer;
+               this.serializer = serializer;
+               this.inTopic = inTopic;
+                this.outTopic = outTopic;
+               this.timeout = timeout;
+       }
+
+       @Override
+       public void init() {
+               logger.info("Invoking init");
+               if (builder == null) {
+                       builder = new TopologyBuilder();
+                       logger.info("Successfully instantiating 
TopologyBuilder");
+
+                       builder.initTopology(evaluationNameOption.getValue());
+                       logger.info("Successfully initializing SAMOA topology 
with name {}", evaluationNameOption.getValue());
+               }
+               
+               // create enterance processor
+               KafkaEntranceProcessor sourceProcessor = new 
KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer);
+               builder.addEntranceProcessor(sourceProcessor);
+               
+               // create stream
+               Stream stream = builder.createStream(sourceProcessor);
+               
+               // create destination processor
+               KafkaDestinationProcessor destProcessor = new 
KafkaDestinationProcessor(producerProps, outTopic, serializer);
+               builder.addProcessor(destProcessor, 
kafkaParallelismOption.getValue());
+               builder.connectInputShuffleStream(stream, destProcessor);
+               
+               // build topology
+               kafkaTopology = builder.build();
+           logger.info("Successfully built the topology");
+       }
+
+       @Override
+       public Topology getTopology() {
+               return kafkaTopology;
+       }
+
+       @Override
+       public void setFactory(ComponentFactory factory) {
+               logger.info("Invoking setFactory: "+factory.toString());
+               builder = new TopologyBuilder(factory);
+           logger.info("Successfully instantiating TopologyBuilder");
+
+           builder.initTopology(evaluationNameOption.getValue());
+           logger.info("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
deleted file mode 100644
index adecac1..0000000
--- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-
-import org.I0Itec.zkclient.ZkClient;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import kafka.server.KafkaServer;
-import kafka.zk.EmbeddedZookeeper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.samoa.instances.InstancesHeader;
-import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory;
-import org.apache.samoa.streams.kafka.topology.SimpleEngine;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-/**
- *
- * @author Jakub Jankowski
- */
-@Ignore
-public class KafkaTaskTest {
-
-    private static final String ZKHOST = "127.0.0.1";//10.255.251.202";        
        //10.255.251.202
-    private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214";   
//10.255.251.214
-    private static final String BROKERPORT = "9092";           //6667, local: 
9092
-    private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
-    private static final int NUM_INSTANCES = 125922;
-
-    private static KafkaServer kafkaServer;
-    private static EmbeddedZookeeper zkServer;
-    private static ZkClient zkClient;
-    private static String zkConnect;
-
-    @BeforeClass
-    public static void setUpClass() throws IOException {
-        // setup Zookeeper
-//        zkServer = new EmbeddedZookeeper();
-//        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
-//        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
-//        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
-
-        // setup Broker
-        /*Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
-        KafkaConfig config = new KafkaConfig(brokerProps);
-        Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);*/
-        // create topic
-        //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
-    }
-
-    @AfterClass
-    public static void tearDownClass() {
-        //kafkaServer.shutdown(); 
-//        zkClient.close();
-//        zkServer.shutdown();
-    }
-
-    @Before
-    public void setUp() throws IOException {
-
-    }
-
-    @After
-    public void tearDown() {
-
-    }
-
-    @Test
-    public void testKafkaTask() throws InterruptedException, 
ExecutionException, TimeoutException {
-        Logger logger = Logger.getLogger(KafkaTaskTest.class.getName());
-        logger.log(Level.INFO, "KafkaTask");
-        Properties producerProps = 
TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT);
-        Properties consumerProps = 
TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT);
-
-        KafkaTask task = new KafkaTask(producerProps, consumerProps, 
"kafkaTaskTest", 10000, new OosTestSerializer(), new OosTestSerializer());
-        task.setFactory(new SimpleComponentFactory());
-        task.init();
-        SimpleEngine.submitTopology(task.getTopology());
-
-        Thread th = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, 
BROKERPORT));
-
-                Random r = new Random();
-                InstancesHeader header = TestUtilsForKafka.generateHeader(10);
-                OosTestSerializer serializer = new OosTestSerializer();
-                int i = 0;
-                for (i = 0; i < NUM_INSTANCES; i++) {
-                    try {
-                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, serializer.serialize(TestUtilsForKafka.getData(r, 10, 
header)));
-                        long stat = producer.send(record).get(10, 
TimeUnit.DAYS).offset();
-//                        Thread.sleep(5);
-                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
-                    } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
-                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
-                    }
-                }
-                producer.flush();
-                producer.close();
-            }
-        });
-        th.start();
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
index 5dc4542..186d97b 100644
--- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
@@ -1,243 +1,247 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-import com.google.gson.Gson;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.utils.Time;
-import org.apache.samoa.instances.InstancesHeader;
-import org.junit.After;
-import org.junit.AfterClass;
-import static org.junit.Assert.*;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- *
- * @author pwawrzyniak
- */
-public class KafkaUtilsTest {
-
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
-    private static final String TOPIC_R = "test-r";
-    private static final String TOPIC_S = "test-s";
-    private static final int NUM_INSTANCES = 50;
-
-    private static KafkaServer kafkaServer;
-    private static EmbeddedZookeeper zkServer;
-    private static ZkClient zkClient;
-    private static String zkConnect;
-
-    private static final Logger logger = 
Logger.getLogger(KafkaUtilsTest.class.getCanonicalName());
-    private final long CONSUMER_TIMEOUT = 1500;
-
-    public KafkaUtilsTest() {
-    }
-
-    @BeforeClass
-    public static void setUpClass() throws IOException {
-        // setup Zookeeper
-        zkServer = new EmbeddedZookeeper();
-        zkConnect = ZKHOST + ":" + zkServer.port();
-        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
-        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
-
-        // setup Broker
-        Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
-        KafkaConfig config = new KafkaConfig(brokerProps);
-        Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
-
-        // create topics
-        AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
-        AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
-
-    }
-
-    @AfterClass
-    public static void tearDownClass() {
-        kafkaServer.shutdown();
-        zkClient.close();
-        zkServer.shutdown();
-    }
-
-    @Before
-    public void setUp() {
-    }
-
-    @After
-    public void tearDown() {
-    }
-
-    /**
-     * Test of initializeConsumer method, of class KafkaUtils.
-     */
-    @Test
-    public void testInitializeConsumer() throws Exception {
-        logger.log(Level.INFO, "initializeConsumer");
-        Collection<String> topics = Arrays.asList(TOPIC_R);
-        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), 
CONSUMER_TIMEOUT);
-        assertNotNull(instance);
-
-        instance.initializeConsumer(topics);
-        Thread.sleep(1000);
-        instance.closeConsumer();
-
-        Thread.sleep(CONSUMER_TIMEOUT);
-
-        instance.initializeConsumer(topics);
-        Thread.sleep(1000);
-        instance.closeConsumer();
-        assertTrue(true);
-    }
-
-    /**
-     * Test of getKafkaMessages method, of class KafkaUtils.
-     */
-    @Test
-    public void testGetKafkaMessages() throws Exception {
-        logger.log(Level.INFO, "getKafkaMessages");
-        Collection<String> topics = Arrays.asList(TOPIC_R);
-        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT), 
CONSUMER_TIMEOUT);
-        assertNotNull(instance);
-
-        logger.log(Level.INFO, "Initialising consumer");
-        instance.initializeConsumer(topics);
-
-        logger.log(Level.INFO, "Produce data");
-        List expResult = sendAndGetMessages(NUM_INSTANCES);
-
-        logger.log(Level.INFO, "Wait a moment");
-        Thread.sleep(CONSUMER_TIMEOUT);
-
-        logger.log(Level.INFO, "Get results from Kafka");
-        List<byte[]> result = instance.getKafkaMessages();
-
-        assertArrayEquals(expResult.toArray(), result.toArray());
-        instance.closeConsumer();
-    }
-
-    private List<byte[]> sendAndGetMessages(int maxNum) throws 
InterruptedException, ExecutionException, TimeoutException {
-        List<byte[]> ret;
-        try (KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test",BROKERHOST,BROKERPORT)))
 {
-            ret = new ArrayList<>();
-            Random r = new Random();
-            InstancesHeader header = TestUtilsForKafka.generateHeader(10);
-            Gson gson = new Gson();
-            int i = 0;
-            for (i = 0; i < maxNum; i++) {
-                ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
-                ret.add(record.value());
-                producer.send(record);
-            }
-            producer.flush();
-        }
-        return ret;
-    }
-
-    /**
-     * Test of sendKafkaMessage method, of class KafkaUtils.
-     *
-     * @throws java.lang.InterruptedException
-     */
-    @Test
-    public void testSendKafkaMessage() throws InterruptedException {
-        logger.log(Level.INFO, "sendKafkaMessage");
-
-        logger.log(Level.INFO, "Initialising producer");
-        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT), 
TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST,BROKERPORT), 
CONSUMER_TIMEOUT);
-        instance.initializeProducer();
-
-        logger.log(Level.INFO, "Initialising consumer");
-        KafkaConsumer<String, byte[]> consumer;
-        consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT));
-        consumer.subscribe(Arrays.asList(TOPIC_S));
-
-        logger.log(Level.INFO, "Produce data");
-        List<byte[]> sent = new ArrayList<>();
-        Random r = new Random();
-        InstancesHeader header = TestUtilsForKafka.generateHeader(10);
-        Gson gson = new Gson();
-        for (int i = 0; i < NUM_INSTANCES; i++) {
-            byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes();
-            sent.add(val);
-            instance.sendKafkaMessage(TOPIC_S, val);
-        }
-        // wait for Kafka a bit :)
-        Thread.sleep(2*CONSUMER_TIMEOUT);
-
-        logger.log(Level.INFO, "Get results from Kafka");
-        ConsumerRecords<String, byte[]> records = 
consumer.poll(CONSUMER_TIMEOUT);
-        Iterator<ConsumerRecord<String, byte[]>> it = records.iterator();
-        List<byte[]> consumed = new ArrayList<>();
-        while (it.hasNext()) {
-            consumed.add(it.next().value());
-        }
-        consumer.close();
-
-        assertArrayEquals(sent.toArray(), consumed.toArray());
-    }
-
-}
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.instances.InstancesHeader;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaUtilsTest {
+
+    private static final String ZKHOST = "127.0.0.1";
+    private static final String BROKERHOST = "127.0.0.1";
+    private static final String BROKERPORT = "9092";
+    private static final String TOPIC_R = "test-r";
+    private static final String TOPIC_S = "test-s";
+    private static final int NUM_INSTANCES = 50;
+
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+
+    private static final Logger logger = 
Logger.getLogger(KafkaUtilsTest.class.getCanonicalName());
+    private final long CONSUMER_TIMEOUT = 1500;
+
+    public KafkaUtilsTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // create topics
+        AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        kafkaServer.shutdown();
+        zkClient.close();
+        zkServer.shutdown();
+    }
+
+    @Before
+    public void setUp() {
+    }
+
+    @After
+    public void tearDown() {
+    }
+
+    /**
+     * Test of initializeConsumer method, of class KafkaUtils.
+     */
+    @Test
+    public void testInitializeConsumer() throws Exception {
+        logger.log(Level.INFO, "initializeConsumer");
+        Collection<String> topics = Arrays.asList(TOPIC_R);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), 
TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT), 
CONSUMER_TIMEOUT);
+        assertNotNull(instance);
+
+        instance.initializeConsumer(topics);
+        Thread.sleep(1000);
+        instance.closeConsumer();
+
+        Thread.sleep(CONSUMER_TIMEOUT);
+
+        instance.initializeConsumer(topics);
+        Thread.sleep(1000);
+        instance.closeConsumer();
+        assertTrue(true);
+    }
+
+    /**
+     * Test of getKafkaMessages method, of class KafkaUtils.
+     */
+    @Test
+    public void testGetKafkaMessages() throws Exception {
+        logger.log(Level.INFO, "getKafkaMessages");
+        Collection<String> topics = Arrays.asList(TOPIC_R);
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), 
TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT), 
CONSUMER_TIMEOUT);
+        assertNotNull(instance);
+
+        logger.log(Level.INFO, "Initialising consumer");
+        instance.initializeConsumer(topics);
+
+        logger.log(Level.INFO, "Produce data");
+        List expResult = sendAndGetMessages(NUM_INSTANCES);
+
+        logger.log(Level.INFO, "Wait a moment");
+        Thread.sleep(CONSUMER_TIMEOUT);
+
+        logger.log(Level.INFO, "Get results from Kafka");
+        List<byte[]> result = instance.getKafkaMessages();
+
+        assertArrayEquals(expResult.toArray(), result.toArray());
+        instance.closeConsumer();
+    }
+
+    private List<byte[]> sendAndGetMessages(int maxNum) throws 
InterruptedException, ExecutionException, TimeoutException {
+        List<byte[]> ret;
+        try (KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties("sendM-test", 
BROKERHOST, BROKERPORT))) {
+            ret = new ArrayList<>();
+            Random r = new Random();
+            InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+            Gson gson = new Gson();
+            int i = 0;
+            for (i = 0; i < maxNum; i++) {
+                ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_R, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
+                ret.add(record.value());
+                producer.send(record);
+            }
+            producer.flush();
+        }
+        return ret;
+    }
+
+    /**
+     * Test of sendKafkaMessage method, of class KafkaUtils.
+     *
+     * @throws java.lang.InterruptedException
+     */
+    @Test
+    public void testSendKafkaMessage() throws InterruptedException {
+        logger.log(Level.INFO, "sendKafkaMessage");
+
+        logger.log(Level.INFO, "Initialising producer");
+        KafkaUtils instance = new 
KafkaUtils(TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT), 
TestUtilsForKafka.getProducerProperties("rcv-test", BROKERHOST, BROKERPORT), 
CONSUMER_TIMEOUT);
+        instance.initializeProducer();
+
+        logger.log(Level.INFO, "Initialising consumer");
+        KafkaConsumer<String, byte[]> consumer;
+        consumer = new 
KafkaConsumer<>(TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT));
+        consumer.subscribe(Arrays.asList(TOPIC_S));
+
+        logger.log(Level.INFO, "Produce data");
+        List<byte[]> sent = new ArrayList<>();
+        Random r = new Random();
+        InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+        Gson gson = new Gson();
+        for (int i = 0; i < NUM_INSTANCES; i++) {
+            byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes();
+            sent.add(val);
+            instance.sendKafkaMessage(TOPIC_S, val);
+        }
+        // wait for Kafka a bit :)
+        Thread.sleep(2 * CONSUMER_TIMEOUT);
+
+        logger.log(Level.INFO, "Get results from Kafka");
+        
+        List<byte[]> consumed = new ArrayList<>();
+        
+        while (consumed.size() != sent.size()) {
+            ConsumerRecords<String, byte[]> records = 
consumer.poll(CONSUMER_TIMEOUT);
+            Iterator<ConsumerRecord<String, byte[]>> it = records.iterator();
+            while (it.hasNext()) {
+                consumed.add(it.next().value());
+            }
+        }
+        consumer.close();
+
+        assertArrayEquals(sent.toArray(), consumed.toArray());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
deleted file mode 100644
index 202833e..0000000
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.core.EntranceProcessor;
-import org.apache.samoa.core.Processor;
-import org.apache.samoa.topology.ComponentFactory;
-import org.apache.samoa.topology.EntranceProcessingItem;
-import org.apache.samoa.topology.IProcessingItem;
-import org.apache.samoa.topology.ProcessingItem;
-import org.apache.samoa.topology.Stream;
-import org.apache.samoa.topology.Topology;
-
-public class SimpleComponentFactory implements ComponentFactory {
-
-  public ProcessingItem createPi(Processor processor, int paralellism) {
-    return new SimpleProcessingItem(processor, paralellism);
-  }
-
-  public ProcessingItem createPi(Processor processor) {
-    return this.createPi(processor, 1);
-  }
-
-  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
-    return new SimpleEntranceProcessingItem(processor);
-  }
-
-  public Stream createStream(IProcessingItem sourcePi) {
-    return new SimpleStream(sourcePi);
-  }
-
-  public Topology createTopology(String topoName) {
-    return new SimpleTopology(topoName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
deleted file mode 100644
index 338444b..0000000
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.topology.Topology;
-
-public class SimpleEngine {
-
-  public static void submitTopology(Topology topology) {
-    SimpleTopology simpleTopology = (SimpleTopology) topology;
-    simpleTopology.run();
-    // runs until completion
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
deleted file mode 100644
index 26ed471..0000000
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.core.EntranceProcessor;
-import org.apache.samoa.topology.LocalEntranceProcessingItem;
-
-class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
-  public SimpleEntranceProcessingItem(EntranceProcessor processor) {
-    super(processor);
-  }
-
-  // The default waiting time when there is no available events is 100ms
-  // Override waitForNewEvents() to change it
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
deleted file mode 100644
index bac0398..0000000
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.core.Processor;
-import org.apache.samoa.topology.AbstractProcessingItem;
-import org.apache.samoa.topology.IProcessingItem;
-import org.apache.samoa.topology.ProcessingItem;
-import org.apache.samoa.topology.Stream;
-import org.apache.samoa.utils.PartitioningScheme;
-import org.apache.samoa.utils.StreamDestination;
-
-/**
- * 
- * @author abifet
- */
-class SimpleProcessingItem extends AbstractProcessingItem {
-  private IProcessingItem[] arrayProcessingItem;
-
-  SimpleProcessingItem(Processor processor) {
-    super(processor);
-  }
-
-  SimpleProcessingItem(Processor processor, int parallelism) {
-    super(processor);
-    this.setParallelism(parallelism);
-  }
-
-  public IProcessingItem getProcessingItem(int i) {
-    return arrayProcessingItem[i];
-  }
-
-  @Override
-  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
-    StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
-    ((SimpleStream) inputStream).addDestination(destination);
-    return this;
-  }
-
-  public SimpleProcessingItem copy() {
-    Processor processor = this.getProcessor();
-    return new SimpleProcessingItem(processor.newProcessor(processor));
-  }
-
-  public void processEvent(ContentEvent event, int counter) {
-
-    int parallelism = this.getParallelism();
-    // System.out.println("Process event "+event+" 
(isLast="+event.isLastEvent()+") with counter="+counter+" while 
parallelism="+parallelism);
-    if (this.arrayProcessingItem == null && parallelism > 0) {
-      // Init processing elements, the first time they are needed
-      this.arrayProcessingItem = new IProcessingItem[parallelism];
-      for (int j = 0; j < parallelism; j++) {
-        arrayProcessingItem[j] = this.copy();
-        arrayProcessingItem[j].getProcessor().onCreate(j);
-      }
-    }
-    if (this.arrayProcessingItem != null) {
-      IProcessingItem pi = this.getProcessingItem(counter);
-      Processor p = pi.getProcessor();
-      // System.out.println("PI="+pi+", p="+p);
-      this.getProcessingItem(counter).getProcessor().process(event);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
deleted file mode 100644
index 8405463..0000000
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.topology.AbstractStream;
-import org.apache.samoa.topology.IProcessingItem;
-import org.apache.samoa.utils.StreamDestination;
-
-/**
- * 
- * @author abifet
- */
-class SimpleStream extends AbstractStream {
-  private List<StreamDestination> destinations;
-  private int maxCounter;
-  private int eventCounter;
-
-  SimpleStream(IProcessingItem sourcePi) {
-    super(sourcePi);
-    this.destinations = new LinkedList<>();
-    this.eventCounter = 0;
-    this.maxCounter = 1;
-  }
-
-  private int getNextCounter() {
-    if (maxCounter > 0 && eventCounter >= maxCounter)
-      eventCounter = 0;
-    this.eventCounter++;
-    return this.eventCounter;
-  }
-
-  @Override
-  public void put(ContentEvent event) {
-    this.put(event, this.getNextCounter());
-  }
-
-  private void put(ContentEvent event, int counter) {
-    SimpleProcessingItem pi;
-    int parallelism;
-    for (StreamDestination destination : destinations) {
-      pi = (SimpleProcessingItem) destination.getProcessingItem();
-      parallelism = destination.getParallelism();
-      switch (destination.getPartitioningScheme()) {
-      case SHUFFLE:
-        pi.processEvent(event, counter % parallelism);
-        break;
-      case GROUP_BY_KEY:
-        HashCodeBuilder hb = new HashCodeBuilder();
-        hb.append(event.getKey());
-        int key = hb.build() % parallelism;
-        pi.processEvent(event, key);
-        break;
-      case BROADCAST:
-        for (int p = 0; p < parallelism; p++) {
-          pi.processEvent(event, p);
-        }
-        break;
-      }
-    }
-  }
-
-  public void addDestination(StreamDestination destination) {
-    this.destinations.add(destination);
-    if (maxCounter <= 0)
-      maxCounter = 1;
-    maxCounter *= destination.getParallelism();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a5cda69c/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
deleted file mode 100644
index d298b69..0000000
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.topology.AbstractTopology;
-
-public class SimpleTopology extends AbstractTopology {
-  SimpleTopology(String name) {
-    super(name);
-  }
-
-  public void run() {
-    if (this.getEntranceProcessingItems() == null)
-      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
-    if (this.getEntranceProcessingItems().size() != 1)
-      throw new IllegalStateException("SimpleTopology supports 1 entrance PI 
only. Number of entrance PIs is "
-          + this.getEntranceProcessingItems().size());
-
-    SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) 
this.getEntranceProcessingItems()
-        .toArray()[0];
-    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple 
mode
-    entrancePi.startSendingEvents();
-  }
-}

Reply via email to