repo cleanup, split code into 3 branches

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/8fbfde79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/8fbfde79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/8fbfde79

Branch: refs/heads/master
Commit: 8fbfde793f459cc680479d76f9d1b1fe86f561d5
Parents: fb17e7f
Author: pwawrzyniak <[email protected]>
Authored: Wed Jun 7 12:21:08 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../samoa/streams/kafka/KafkaAvroMapper.java    | 160 ---------
 .../samoa/streams/kafka/KafkaJsonMapper.java    | 106 ------
 .../streams/kafka/avro/SamoaDatumReader.java    | 115 -------
 .../kafka/topology/SimpleComponentFactory.java  |  53 ---
 .../streams/kafka/topology/SimpleEngine.java    |  37 ---
 .../topology/SimpleEntranceProcessingItem.java  |  33 --
 .../kafka/topology/SimpleProcessingItem.java    |  87 -----
 .../streams/kafka/topology/SimpleStream.java    |  95 ------
 .../streams/kafka/topology/SimpleTopology.java  |  46 ---
 .../kafka/AvroSerializerDeserializerTest.java   |  70 ----
 .../kafka/KafkaDestinationProcessorTest.java    |   2 +-
 .../kafka/KafkaEntranceProcessorTest.java       |  66 +---
 .../samoa/streams/kafka/KafkaTaskTest.java      | 327 +++++++++----------
 .../samoa/streams/kafka/OosTestSerializer.java  |  60 ++++
 .../kafka/topology/SimpleComponentFactory.java  |  53 +++
 .../streams/kafka/topology/SimpleEngine.java    |  37 +++
 .../topology/SimpleEntranceProcessingItem.java  |  33 ++
 .../kafka/topology/SimpleProcessingItem.java    |  87 +++++
 .../streams/kafka/topology/SimpleStream.java    |  95 ++++++
 .../streams/kafka/topology/SimpleTopology.java  |  46 +++
 20 files changed, 578 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
deleted file mode 100644
index a045bed..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.samoa.learners.InstanceContentEvent;
-import org.apache.samoa.streams.kafka.avro.SamoaDatumReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * Sample class for serializing and deserializing {@link InstanceContentEvent}
- * from/to Avro format
- * 
- * @author Jakub Jankowski
- * @version 0.5.0-incubating-SNAPSHOT
- * @since 0.5.0-incubating
- */
-public class KafkaAvroMapper implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> {
-
-       private static Logger logger = 
LoggerFactory.getLogger(KafkaAvroMapper.class);
-
-       @Override
-       public byte[] serialize(InstanceContentEvent message) {
-               return avroSerialize(InstanceContentEvent.class, message);
-       }
-
-       @Override
-       public InstanceContentEvent deserialize(byte[] message) {
-               return avroDeserialize(message, InstanceContentEvent.class);
-       }
-
-
-       /** 
-        * Avro serialization based on specified schema
-        * @param cls
-        * @param v
-        * @return
-        */
-       public static <V> byte[] avroSerialize(final Class<V> cls, final V v) {
-               ByteArrayOutputStream bout = new ByteArrayOutputStream();
-               try {
-                       Schema schema = new 
Schema.Parser().parse(KafkaAvroMapper.class.getResourceAsStream("/kafka.avsc"));
-                       DatumWriter<V> writer;
-
-                       if (v instanceof SpecificRecord) {
-                               writer = new SpecificDatumWriter<>(schema);
-                       } else {
-                               writer = new ReflectDatumWriter<>(schema);
-                       }
-                       
-                       BinaryEncoder binEncoder = 
EncoderFactory.get().binaryEncoder(bout, null);
-                       writer.write(v, binEncoder);
-            binEncoder.flush();
-            
-               } catch (IOException e) {
-                       e.printStackTrace();
-               } catch (final Exception e) {
-                       throw new RuntimeException(e);
-               }
-               
-               return bout.toByteArray();
-               
-       }
-       
-       /** 
-        * Avro deserialization based on specified schema
-        * @param cls
-        * @param v
-        * @return
-        */
-       public static <V> V avroDeserialize(byte[] avroBytes, Class<V> clazz) {
-               V ret = null;
-               try {
-                       Schema schema = new 
Schema.Parser().parse(KafkaAvroMapper.class.getResourceAsStream("/kafka.avsc"));
-                       ByteArrayInputStream in = new 
ByteArrayInputStream(avroBytes);
-                       DatumReader<V> reader = new SamoaDatumReader<>(schema);
-                       
-                       Decoder decoder = 
DecoderFactory.get().directBinaryDecoder(in, null);
-                       
-                       ret = reader.read(null, decoder);
-               } catch (IOException e) {
-                       e.printStackTrace();
-               } catch (final Exception e) {
-                       throw new RuntimeException(e);
-               }
-               
-               return ret;
-       }
-       
-       /** 
-        * Avro serialization using reflection
-        * @param cls
-        * @param v
-        * @return
-        */
-       public static <V> byte[] toBytesGeneric(final Class<V> cls, final V v) {
-               final ByteArrayOutputStream bout = new ByteArrayOutputStream();
-               final Schema schema = 
ReflectData.AllowNull.get().getSchema(cls);
-               final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema);
-               final BinaryEncoder binEncoder = 
EncoderFactory.get().binaryEncoder(bout, null);
-               try {
-                       writer.write(v, binEncoder);
-                       binEncoder.flush();
-               } catch (final Exception e) {
-                       throw new RuntimeException(e);
-               }
-
-               return bout.toByteArray();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
deleted file mode 100644
index 2ac3e04..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.InstanceCreator;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import java.lang.reflect.Type;
-import java.nio.charset.Charset;
-import org.apache.samoa.instances.DenseInstanceData;
-import org.apache.samoa.instances.InstanceData;
-import org.apache.samoa.learners.InstanceContentEvent;
-
-/**
- * Sample class for serializing and deserializing {@link InstanceContentEvent}
- * from/to JSON format
- *
- * @author pwawrzyniak
- * @version 0.5.0-incubating-SNAPSHOT
- * @since 0.5.0-incubating
- */
-public class KafkaJsonMapper implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> {
-
-    private final transient Gson gson;
-    private final Charset charset;
-
-    /**
-     * Class constructor
-     *
-     * @param charset Charset to be used for bytes parsing
-     */
-    public KafkaJsonMapper(Charset charset) {
-        this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, 
new InstanceDataCustomDeserializer()).create();
-        this.charset = charset;
-    }
-
-    @Override
-    public InstanceContentEvent deserialize(byte[] message) {
-        return gson.fromJson(new String(message, this.charset), 
InstanceContentEvent.class);
-    }
-
-    @Override
-    public byte[] serialize(InstanceContentEvent message) {
-        return gson.toJson(message).getBytes(this.charset);
-    }
-
-    //Unused
-    public class InstanceDataCreator implements InstanceCreator<InstanceData> {
-
-        @Override
-        public InstanceData createInstance(Type type) {
-            return new DenseInstanceData();
-        }
-    }
-
-    public class InstanceDataCustomDeserializer implements 
JsonDeserializer<InstanceData> {
-
-        @Override
-        public DenseInstanceData deserialize(JsonElement je, Type type, 
JsonDeserializationContext jdc) throws JsonParseException {
-            double[] attributeValues = null;
-            JsonObject obj = (JsonObject) je;
-            attributeValues = jdc.deserialize(obj.get("attributeValues"), 
double[].class);
-            DenseInstanceData did = new DenseInstanceData(attributeValues);
-            return did;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/avro/SamoaDatumReader.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/avro/SamoaDatumReader.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/avro/SamoaDatumReader.java
deleted file mode 100644
index b7a18aa..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/avro/SamoaDatumReader.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.apache.samoa.streams.kafka.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericData.Array;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.io.ResolvingDecoder;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.samoa.instances.DenseInstanceData;
-import org.apache.samoa.instances.SingleClassInstanceData;
-import org.apache.samoa.instances.SparseInstanceData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * DatumReader used to read objects built with InstanceData classes
- * @author Jakub Jankowski
- *
- * @param <T>
- */
-public class SamoaDatumReader<T> extends ReflectDatumReader<T> {
-
-       private static Logger logger = 
LoggerFactory.getLogger(SamoaDatumReader.class);
-
-       public SamoaDatumReader() {
-               super();
-       }
-
-       /** Construct for reading instances of a class. */
-       public SamoaDatumReader(Class<T> c) {
-               super(c);
-       }
-
-       /** Construct where the writer's and reader's schemas are the same. */
-       public SamoaDatumReader(Schema root) {
-               super(root);
-       }
-
-       /** Construct given writer's and reader's schema. */
-       public SamoaDatumReader(Schema writer, Schema reader) {
-               super(writer, reader);
-       }
-
-       /** Construct given writer's and reader's schema and the data model. */
-       public SamoaDatumReader(Schema writer, Schema reader, ReflectData data) 
{
-               super(writer, reader, data);
-       }
-
-       /** Construct given a {@link ReflectData}. */
-       public SamoaDatumReader(ReflectData data) {
-               super(data);
-       }
-
-       @Override
-       /**
-        * Called to read a record instance. Overridden to read InstanceData.
-        */
-       protected Object readRecord(Object old, Schema expected, 
ResolvingDecoder in) throws IOException {
-               Object r = getData().newRecord(old, expected);
-               Object state = null;
-
-               for (Field f : in.readFieldOrder()) {
-                       int pos = f.pos();
-                       String name = f.name();
-                       Object oldDatum = null;
-                       if (r instanceof DenseInstanceData) {
-                               r = readDenseInstanceData(r, f, oldDatum, in, 
state);
-                       } else if (r instanceof SparseInstanceData) {
-                               r = readSparseInstanceData(r, f, oldDatum, in, 
state);
-                       } else
-                               readField(r, f, oldDatum, in, state);
-               }
-               
-               return r;
-       }
-
-       private Object readDenseInstanceData(Object record, Field f, Object 
oldDatum, ResolvingDecoder in, Object state)
-                       throws IOException {
-               if (f.name().equals("attributeValues")) {
-                       Array atributes = (Array) read(oldDatum, f.schema(), 
in);
-                       double[] atributesArr = new double[atributes.size()];
-                       for (int i = 0; i < atributes.size(); i++) {
-                               atributesArr[i] = (double) atributes.get(i);
-                       }
-                       return new DenseInstanceData(atributesArr);
-               }
-               return null;
-       }
-       
-       private Object readSparseInstanceData(Object record, Field f, Object 
oldDatum, ResolvingDecoder in, Object state)
-                       throws IOException {
-               if(f.name().equals("attributeValues")) {
-                       Array atributes = (Array) read(oldDatum, f.schema(), 
in);
-                       double[] atributesArr = new double[atributes.size()];
-                       for (int i = 0; i < atributes.size(); i++) 
-                               atributesArr[i] = (double) atributes.get(i);
-                       
((SparseInstanceData)record).setAttributeValues(atributesArr);
-               }
-               if(f.name().equals("indexValues")) {
-                       Array indexValues = (Array) read(oldDatum, f.schema(), 
in);
-                       int[] indexValuesArr = new int[indexValues.size()];
-                       for (int i = 0; i < indexValues.size(); i++) {
-                               indexValuesArr[i] = (int) indexValues.get(i);
-                       }
-                       
((SparseInstanceData)record).setIndexValues(indexValuesArr);
-               }
-               return record;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
deleted file mode 100644
index 155ce1f..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.core.EntranceProcessor;
-import org.apache.samoa.core.Processor;
-import org.apache.samoa.topology.ComponentFactory;
-import org.apache.samoa.topology.EntranceProcessingItem;
-import org.apache.samoa.topology.IProcessingItem;
-import org.apache.samoa.topology.ProcessingItem;
-import org.apache.samoa.topology.Stream;
-import org.apache.samoa.topology.Topology;
-
-public class SimpleComponentFactory implements ComponentFactory {
-
-  public ProcessingItem createPi(Processor processor, int paralellism) {
-    return new SimpleProcessingItem(processor, paralellism);
-  }
-
-  public ProcessingItem createPi(Processor processor) {
-    return this.createPi(processor, 1);
-  }
-
-  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
-    return new SimpleEntranceProcessingItem(processor);
-  }
-
-  public Stream createStream(IProcessingItem sourcePi) {
-    return new SimpleStream(sourcePi);
-  }
-
-  public Topology createTopology(String topoName) {
-    return new SimpleTopology(topoName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
deleted file mode 100644
index d446018..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.topology.Topology;
-
-public class SimpleEngine {
-
-  public static void submitTopology(Topology topology) {
-    SimpleTopology simpleTopology = (SimpleTopology) topology;
-    simpleTopology.run();
-    // runs until completion
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
deleted file mode 100644
index 4c626dc..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.core.EntranceProcessor;
-import org.apache.samoa.topology.LocalEntranceProcessingItem;
-
-class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
-  public SimpleEntranceProcessingItem(EntranceProcessor processor) {
-    super(processor);
-  }
-
-  // The default waiting time when there is no available events is 100ms
-  // Override waitForNewEvents() to change it
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
deleted file mode 100644
index 3549b85..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.core.Processor;
-import org.apache.samoa.topology.AbstractProcessingItem;
-import org.apache.samoa.topology.IProcessingItem;
-import org.apache.samoa.topology.ProcessingItem;
-import org.apache.samoa.topology.Stream;
-import org.apache.samoa.utils.PartitioningScheme;
-import org.apache.samoa.utils.StreamDestination;
-
-/**
- * 
- * @author abifet
- */
-class SimpleProcessingItem extends AbstractProcessingItem {
-  private IProcessingItem[] arrayProcessingItem;
-
-  SimpleProcessingItem(Processor processor) {
-    super(processor);
-  }
-
-  SimpleProcessingItem(Processor processor, int parallelism) {
-    super(processor);
-    this.setParallelism(parallelism);
-  }
-
-  public IProcessingItem getProcessingItem(int i) {
-    return arrayProcessingItem[i];
-  }
-
-  @Override
-  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
-    StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
-    ((SimpleStream) inputStream).addDestination(destination);
-    return this;
-  }
-
-  public SimpleProcessingItem copy() {
-    Processor processor = this.getProcessor();
-    return new SimpleProcessingItem(processor.newProcessor(processor));
-  }
-
-  public void processEvent(ContentEvent event, int counter) {
-
-    int parallelism = this.getParallelism();
-    // System.out.println("Process event "+event+" 
(isLast="+event.isLastEvent()+") with counter="+counter+" while 
parallelism="+parallelism);
-    if (this.arrayProcessingItem == null && parallelism > 0) {
-      // Init processing elements, the first time they are needed
-      this.arrayProcessingItem = new IProcessingItem[parallelism];
-      for (int j = 0; j < parallelism; j++) {
-        arrayProcessingItem[j] = this.copy();
-        arrayProcessingItem[j].getProcessor().onCreate(j);
-      }
-    }
-    if (this.arrayProcessingItem != null) {
-      IProcessingItem pi = this.getProcessingItem(counter);
-      Processor p = pi.getProcessor();
-      // System.out.println("PI="+pi+", p="+p);
-      this.getProcessingItem(counter).getProcessor().process(event);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
deleted file mode 100644
index 269e0cc..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.topology.AbstractStream;
-import org.apache.samoa.topology.IProcessingItem;
-import org.apache.samoa.utils.StreamDestination;
-
-/**
- * 
- * @author abifet
- */
-class SimpleStream extends AbstractStream {
-  private List<StreamDestination> destinations;
-  private int maxCounter;
-  private int eventCounter;
-
-  SimpleStream(IProcessingItem sourcePi) {
-    super(sourcePi);
-    this.destinations = new LinkedList<>();
-    this.eventCounter = 0;
-    this.maxCounter = 1;
-  }
-
-  private int getNextCounter() {
-    if (maxCounter > 0 && eventCounter >= maxCounter)
-      eventCounter = 0;
-    this.eventCounter++;
-    return this.eventCounter;
-  }
-
-  @Override
-  public void put(ContentEvent event) {
-    this.put(event, this.getNextCounter());
-  }
-
-  private void put(ContentEvent event, int counter) {
-    SimpleProcessingItem pi;
-    int parallelism;
-    for (StreamDestination destination : destinations) {
-      pi = (SimpleProcessingItem) destination.getProcessingItem();
-      parallelism = destination.getParallelism();
-      switch (destination.getPartitioningScheme()) {
-      case SHUFFLE:
-        pi.processEvent(event, counter % parallelism);
-        break;
-      case GROUP_BY_KEY:
-        HashCodeBuilder hb = new HashCodeBuilder();
-        hb.append(event.getKey());
-        int key = hb.build() % parallelism;
-        pi.processEvent(event, key);
-        break;
-      case BROADCAST:
-        for (int p = 0; p < parallelism; p++) {
-          pi.processEvent(event, p);
-        }
-        break;
-      }
-    }
-  }
-
-  public void addDestination(StreamDestination destination) {
-    this.destinations.add(destination);
-    if (maxCounter <= 0)
-      maxCounter = 1;
-    maxCounter *= destination.getParallelism();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
deleted file mode 100644
index 98dd7a5..0000000
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package org.apache.samoa.streams.kafka.topology;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.topology.AbstractTopology;
-
-public class SimpleTopology extends AbstractTopology {
-  SimpleTopology(String name) {
-    super(name);
-  }
-
-  public void run() {
-    if (this.getEntranceProcessingItems() == null)
-      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
-    if (this.getEntranceProcessingItems().size() != 1)
-      throw new IllegalStateException("SimpleTopology supports 1 entrance PI 
only. Number of entrance PIs is "
-          + this.getEntranceProcessingItems().size());
-
-    SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) 
this.getEntranceProcessingItems()
-        .toArray()[0];
-    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple 
mode
-    entrancePi.startSendingEvents();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/AvroSerializerDeserializerTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/AvroSerializerDeserializerTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/AvroSerializerDeserializerTest.java
deleted file mode 100644
index 1a1a718..0000000
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/AvroSerializerDeserializerTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.samoa.streams.kafka;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.Random;
-import java.util.logging.Logger;
-
-import org.apache.samoa.instances.InstancesHeader;
-import org.apache.samoa.learners.InstanceContentEvent;
-import org.apache.samoa.streams.kafka.KafkaAvroMapper;
-import org.junit.Test;
-
-public class AvroSerializerDeserializerTest {
-
-       private Logger logger = 
Logger.getLogger(AvroSerializerDeserializerTest.class.getName());
-       public AvroSerializerDeserializerTest() {}
-       
-       @Test
-       public void testAvroSerialize() {
-               Random r = new Random();
-        InstancesHeader header = TestUtilsForKafka.generateHeader(10);
-        InstanceContentEvent eventToSerialize = TestUtilsForKafka.getData(r, 
10, header);
-               byte[] data = 
KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, eventToSerialize);
-               
-               InstanceContentEvent eventDeserialized = 
KafkaAvroMapper.avroDeserialize(data, InstanceContentEvent.class);
-               
-               assertTrue("Serialized and deserialized event", 
isEqual(eventToSerialize, eventDeserialized));
-               
-       }
-       
-       public boolean isEqual(InstanceContentEvent a, InstanceContentEvent b) {
-               if(a.getClassId() != b.getClassId()) {
-                       logger.info("a.getClassId() != b.getClassId(): " + 
(a.getClassId() != b.getClassId()));
-                       return false;
-               }
-               if(a.isLastEvent() != b.isLastEvent()) {
-                       logger.info("a.isLastEvent() != b.isLastEvent(): " + 
(a.isLastEvent() != b.isLastEvent()));
-                       return false;
-               }
-               if(a.isTesting() != b.isTesting()) {
-                       logger.info("a.isTesting() != b.isTesting(): " + 
(a.isTesting() != b.isTesting()));
-                       return false;
-               }
-               if(a.isTraining() != b.isTraining()) {
-                       logger.info("a.isTraining() != b.isTraining(): " + 
(a.isTraining() != b.isTraining()));
-                       return false;
-               }
-               if(a.getClassifierIndex() != b.getClassifierIndex()) {
-                       logger.info("a.getClassifierIndex() != 
b.getClassifierIndex(): " + (a.getClassifierIndex() != b.getClassifierIndex()));
-                       return false;
-               }
-               if(a.getEvaluationIndex() != b.getEvaluationIndex()) {
-                       logger.info("a.getEvaluationIndex() != 
b.getEvaluationIndex(): " + (a.getEvaluationIndex() != b.getEvaluationIndex()));
-                       return false;
-               }
-               if(a.getInstanceIndex() != b.getInstanceIndex()) {
-                       logger.info("a.getInstanceIndex() != 
b.getInstanceIndex(): " + (a.getInstanceIndex() != b.getInstanceIndex()));
-                       return false;
-               }
-               
if(!a.getInstance().toString().equals(b.getInstance().toString())) {
-                       logger.info("a.getInstance().toString()!= 
b.getInstance().toString(): " + (a.getInstance().toString()!= 
b.getInstance().toString()));
-                       logger.info("a.toString(): " + 
a.getInstance().toString());
-                       logger.info("b.toString(): " + 
b.getInstance().toString());
-                       return false;
-               }
-               
-               return true;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
index bf45ffb..2d59456 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessorTest.java
@@ -135,7 +135,7 @@ public class KafkaDestinationProcessorTest {
         final Logger logger = 
Logger.getLogger(KafkaDestinationProcessorTest.class.getName());
         Properties props = 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT);
         props.setProperty("auto.offset.reset", "earliest");
-        KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, 
TOPIC, new KafkaJsonMapper(Charset.defaultCharset()));
+        KafkaDestinationProcessor kdp = new KafkaDestinationProcessor(props, 
TOPIC, new OosTestSerializer());
         kdp.onCreate(1);
 
         final int[] i = {0};

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
index 933ba2a..b8b5c72 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -138,80 +138,32 @@ public class KafkaEntranceProcessorTest {
     }
 
     @Test
-    public void testFetchingNewDataWithJson() throws InterruptedException, 
ExecutionException, TimeoutException {
+    public void testFetchingNewData() throws InterruptedException, 
ExecutionException, TimeoutException {
 
         final Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
-        logger.log(Level.INFO, "JSON");
-        logger.log(Level.INFO, "testFetchingNewDataWithJson");
+        logger.log(Level.INFO, "OOS");
+        logger.log(Level.INFO, "testFetchingNewData");
         Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT);
         props.setProperty("auto.offset.reset", "earliest");
-        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_JSON, TIMEOUT, new KafkaJsonMapper(Charset.defaultCharset()));
+        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_JSON, TIMEOUT, new OosTestSerializer());
 
         kep.onCreate(1);
-       
+
         // prepare new thread for data producing
         Thread th = new Thread(new Runnable() {
             @Override
             public void run() {
-                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT));
+                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, 
BROKERPORT));
 
                 Random r = new Random();
                 InstancesHeader header = TestUtilsForKafka.generateHeader(10);
-                Gson gson = new Gson();
+                OosTestSerializer serializer = new OosTestSerializer();
                 int i = 0;
                 for (i = 0; i < NUM_INSTANCES; i++) {
                     try {
                         InstanceContentEvent event = 
TestUtilsForKafka.getData(r, 10, header);
-                                             
-                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_JSON, gson.toJson(event).getBytes());
-                        long stat = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
-                    } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
-                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
-                    }
-                }
-                producer.flush();
-                producer.close();
-            }
-        });
-        th.start();
 
-        int z = 0;
-        while (z < NUM_INSTANCES && kep.hasNext()) {
-            InstanceContentEvent event = (InstanceContentEvent) 
kep.nextEvent();            
-            z++;
-        }
-        
-        assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);
-
-    }
-
-    @Test
-    public void testFetchingNewDataWithAvro() throws InterruptedException, 
ExecutionException, TimeoutException {
-        Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
-        logger.log(Level.INFO, "AVRO");
-        logger.log(Level.INFO, "testFetchingNewDataWithAvro");
-        Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT);
-        props.setProperty("auto.offset.reset", "earliest");
-        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_AVRO, TIMEOUT, new KafkaAvroMapper());
-        kep.onCreate(1);
-
-//         prepare new thread for data producing
-        Thread th = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT));
-
-                Random r = new Random();
-                InstancesHeader header = TestUtilsForKafka.generateHeader(10);
-
-                int i = 0;
-                for (i = 0; i < NUM_INSTANCES; i++) {
-                    try {
-                        byte[] data = 
KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, 
TestUtilsForKafka.getData(r, 10, header));
-                        if (data == null) {
-                            
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Serialize result: null ({0})", i);
-                        }
-                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_AVRO, data);
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_JSON, serializer.serialize(event));
                         long stat = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
                     } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
                         
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
@@ -227,9 +179,9 @@ public class KafkaEntranceProcessorTest {
         while (z < NUM_INSTANCES && kep.hasNext()) {
             InstanceContentEvent event = (InstanceContentEvent) 
kep.nextEvent();
             z++;
-//            logger.log(Level.INFO, "{0} {1}", new Object[]{z, 
event.getInstance().toString()});
         }
 
         assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
index 08aae11..4215b08 100644
--- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
@@ -1,170 +1,157 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import com.google.gson.Gson;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.kafka.common.utils.Time;
-import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory;
-import org.apache.samoa.streams.kafka.topology.SimpleEngine;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.samoa.instances.InstancesHeader;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2017 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
-*
-* @author Jakub Jankowski
-*/
-@Ignore
-public class KafkaTaskTest {
-       
-    private static final String ZKHOST = "127.0.0.1";//10.255.251.202";        
        //10.255.251.202
-    private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214";   
//10.255.251.214
-    private static final String BROKERPORT = "9092";           //6667, local: 
9092
-    private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
-    private static final int NUM_INSTANCES = 125922;
-    
-    
-    private static KafkaServer kafkaServer;
-    private static EmbeddedZookeeper zkServer;
-    private static ZkClient zkClient;
-    private static String zkConnect;
-    
-    @BeforeClass
-    public static void setUpClass() throws IOException {
-        // setup Zookeeper
-//        zkServer = new EmbeddedZookeeper();
-//        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
-//        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
-//        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
-
-        // setup Broker
-        /*Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
-        brokerProps.setProperty("broker.id", "0");
-        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
-        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
-        KafkaConfig config = new KafkaConfig(brokerProps);
-        Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);*/
-
-        // create topic
-        //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
-    }
-    
-    @AfterClass
-    public static void tearDownClass() {
-        //kafkaServer.shutdown(); 
-//        zkClient.close();
-//        zkServer.shutdown();
-    }
-
-    @Before
-    public void setUp() throws IOException {
-
-    }
-
-    @After
-    public void tearDown() {
-
-    }
-    
-    @Test
-    public void testKafkaTask() throws InterruptedException, 
ExecutionException, TimeoutException {
-        Logger logger = Logger.getLogger(KafkaTaskTest.class.getName());
-        logger.log(Level.INFO, "KafkaTask");
-        Properties producerProps = 
TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT);
-        Properties consumerProps = 
TestUtilsForKafka.getConsumerProperties(BROKERHOST,BROKERPORT);
-            
-        KafkaTask task = new KafkaTask(producerProps, consumerProps, 
"kafkaTaskTest", 10000, new KafkaJsonMapper(Charset.defaultCharset()), new 
KafkaJsonMapper(Charset.defaultCharset()));
-        task.setFactory(new SimpleComponentFactory());
-        task.init();
-        SimpleEngine.submitTopology(task.getTopology());
-        
-                Thread th = new Thread(new Runnable() {
-            @Override
-            public void run() {
-                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST,BROKERPORT));
-
-                Random r = new Random();
-                InstancesHeader header = TestUtilsForKafka.generateHeader(10);
-                Gson gson = new Gson();
-                int i = 0;
-                for (i = 0; i < NUM_INSTANCES; i++) {
-                    try {
-                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes());
-                        long stat = producer.send(record).get(10, 
TimeUnit.DAYS).offset();
-//                        Thread.sleep(5);
-                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
-                    } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
-                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
-                    }
-                }
-                producer.flush();
-                producer.close();
-            }
-        });
-        th.start();
-        
-    }
-}
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+import org.I0Itec.zkclient.ZkClient;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import kafka.server.KafkaServer;
+import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory;
+import org.apache.samoa.streams.kafka.topology.SimpleEngine;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+/**
+ *
+ * @author Jakub Jankowski
+ */
+@Ignore
+public class KafkaTaskTest {
+
+    private static final String ZKHOST = "127.0.0.1";//10.255.251.202";        
        //10.255.251.202
+    private static final String BROKERHOST = "127.0.0.1";//"10.255.251.214";   
//10.255.251.214
+    private static final String BROKERPORT = "9092";           //6667, local: 
9092
+    private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
+    private static final int NUM_INSTANCES = 125922;
+
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+//        zkServer = new EmbeddedZookeeper();
+//        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
+//        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+//        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        /*Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);*/
+        // create topic
+        //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+        //kafkaServer.shutdown(); 
+//        zkClient.close();
+//        zkServer.shutdown();
+    }
+
+    @Before
+    public void setUp() throws IOException {
+
+    }
+
+    @After
+    public void tearDown() {
+
+    }
+
+    @Test
+    public void testKafkaTask() throws InterruptedException, 
ExecutionException, TimeoutException {
+        Logger logger = Logger.getLogger(KafkaTaskTest.class.getName());
+        logger.log(Level.INFO, "KafkaTask");
+        Properties producerProps = 
TestUtilsForKafka.getProducerProperties(BROKERHOST, BROKERPORT);
+        Properties consumerProps = 
TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT);
+
+        KafkaTask task = new KafkaTask(producerProps, consumerProps, 
"kafkaTaskTest", 10000, new OosTestSerializer(), new OosTestSerializer());
+        task.setFactory(new SimpleComponentFactory());
+        task.init();
+        SimpleEngine.submitTopology(task.getTopology());
+
+        Thread th = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties(BROKERHOST, 
BROKERPORT));
+
+                Random r = new Random();
+                InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+                OosTestSerializer serializer = new OosTestSerializer();
+                int i = 0;
+                for (i = 0; i < NUM_INSTANCES; i++) {
+                    try {
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, serializer.serialize(TestUtilsForKafka.getData(r, 10, 
header)));
+                        long stat = producer.send(record).get(10, 
TimeUnit.DAYS).offset();
+//                        Thread.sleep(5);
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Sent message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
+                    } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
+                    }
+                }
+                producer.flush();
+                producer.close();
+            }
+        });
+        th.start();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
new file mode 100644
index 0000000..649d3e0
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/OosTestSerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.learners.InstanceContentEvent;
+
+/**
+ *
+ * @author Piotr Wawrzyniak
+ */
+public class OosTestSerializer implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> {
+
+    @Override
+    public InstanceContentEvent deserialize(byte[] message) {
+        try {
+            ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(message));
+            InstanceContentEvent ice = (InstanceContentEvent)ois.readObject();
+            return ice;
+        } catch (IOException | ClassNotFoundException ex) {
+            
Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
+        }
+        return null;
+    }
+
+    @Override
+    public byte[] serialize(InstanceContentEvent message) {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(message);
+            oos.flush();
+            return baos.toByteArray();            
+        } catch (IOException ex) {
+            
Logger.getLogger(OosTestSerializer.class.getName()).log(Level.SEVERE, null, ex);
+        }        
+        return null;
+    }
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
new file mode 100644
index 0000000..202833e
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
@@ -0,0 +1,53 @@
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+public class SimpleComponentFactory implements ComponentFactory {
+
+  public ProcessingItem createPi(Processor processor, int paralellism) {
+    return new SimpleProcessingItem(processor, paralellism);
+  }
+
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
+
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+    return new SimpleEntranceProcessingItem(processor);
+  }
+
+  public Stream createStream(IProcessingItem sourcePi) {
+    return new SimpleStream(sourcePi);
+  }
+
+  public Topology createTopology(String topoName) {
+    return new SimpleTopology(topoName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
new file mode 100644
index 0000000..338444b
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
@@ -0,0 +1,37 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.Topology;
+
+public class SimpleEngine {
+
+  public static void submitTopology(Topology topology) {
+    SimpleTopology simpleTopology = (SimpleTopology) topology;
+    simpleTopology.run();
+    // runs until completion
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
new file mode 100644
index 0000000..26ed471
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
@@ -0,0 +1,33 @@
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.LocalEntranceProcessingItem;
+
+class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
+  public SimpleEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  // The default waiting time when there is no available events is 100ms
+  // Override waitForNewEvents() to change it
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
new file mode 100644
index 0000000..bac0398
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
@@ -0,0 +1,87 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ * 
+ * @author abifet
+ */
+class SimpleProcessingItem extends AbstractProcessingItem {
+  private IProcessingItem[] arrayProcessingItem;
+
+  SimpleProcessingItem(Processor processor) {
+    super(processor);
+  }
+
+  SimpleProcessingItem(Processor processor, int parallelism) {
+    super(processor);
+    this.setParallelism(parallelism);
+  }
+
+  public IProcessingItem getProcessingItem(int i) {
+    return arrayProcessingItem[i];
+  }
+
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
+    ((SimpleStream) inputStream).addDestination(destination);
+    return this;
+  }
+
+  public SimpleProcessingItem copy() {
+    Processor processor = this.getProcessor();
+    return new SimpleProcessingItem(processor.newProcessor(processor));
+  }
+
+  public void processEvent(ContentEvent event, int counter) {
+
+    int parallelism = this.getParallelism();
+    // System.out.println("Process event "+event+" 
(isLast="+event.isLastEvent()+") with counter="+counter+" while 
parallelism="+parallelism);
+    if (this.arrayProcessingItem == null && parallelism > 0) {
+      // Init processing elements, the first time they are needed
+      this.arrayProcessingItem = new IProcessingItem[parallelism];
+      for (int j = 0; j < parallelism; j++) {
+        arrayProcessingItem[j] = this.copy();
+        arrayProcessingItem[j].getProcessor().onCreate(j);
+      }
+    }
+    if (this.arrayProcessingItem != null) {
+      IProcessingItem pi = this.getProcessingItem(counter);
+      Processor p = pi.getProcessor();
+      // System.out.println("PI="+pi+", p="+p);
+      this.getProcessingItem(counter).getProcessor().process(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
new file mode 100644
index 0000000..8405463
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
@@ -0,0 +1,95 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ * 
+ * @author abifet
+ */
+class SimpleStream extends AbstractStream {
+  private List<StreamDestination> destinations;
+  private int maxCounter;
+  private int eventCounter;
+
+  SimpleStream(IProcessingItem sourcePi) {
+    super(sourcePi);
+    this.destinations = new LinkedList<>();
+    this.eventCounter = 0;
+    this.maxCounter = 1;
+  }
+
+  private int getNextCounter() {
+    if (maxCounter > 0 && eventCounter >= maxCounter)
+      eventCounter = 0;
+    this.eventCounter++;
+    return this.eventCounter;
+  }
+
+  @Override
+  public void put(ContentEvent event) {
+    this.put(event, this.getNextCounter());
+  }
+
+  private void put(ContentEvent event, int counter) {
+    SimpleProcessingItem pi;
+    int parallelism;
+    for (StreamDestination destination : destinations) {
+      pi = (SimpleProcessingItem) destination.getProcessingItem();
+      parallelism = destination.getParallelism();
+      switch (destination.getPartitioningScheme()) {
+      case SHUFFLE:
+        pi.processEvent(event, counter % parallelism);
+        break;
+      case GROUP_BY_KEY:
+        HashCodeBuilder hb = new HashCodeBuilder();
+        hb.append(event.getKey());
+        int key = hb.build() % parallelism;
+        pi.processEvent(event, key);
+        break;
+      case BROADCAST:
+        for (int p = 0; p < parallelism; p++) {
+          pi.processEvent(event, p);
+        }
+        break;
+      }
+    }
+  }
+
+  public void addDestination(StreamDestination destination) {
+    this.destinations.add(destination);
+    if (maxCounter <= 0)
+      maxCounter = 1;
+    maxCounter *= destination.getParallelism();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/8fbfde79/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
new file mode 100644
index 0000000..d298b69
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
@@ -0,0 +1,46 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.AbstractTopology;
+
+public class SimpleTopology extends AbstractTopology {
+  SimpleTopology(String name) {
+    super(name);
+  }
+
+  public void run() {
+    if (this.getEntranceProcessingItems() == null)
+      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
+    if (this.getEntranceProcessingItems().size() != 1)
+      throw new IllegalStateException("SimpleTopology supports 1 entrance PI 
only. Number of entrance PIs is "
+          + this.getEntranceProcessingItems().size());
+
+    SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) 
this.getEntranceProcessingItems()
+        .toArray()[0];
+    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple 
mode
+    entrancePi.startSendingEvents();
+  }
+}

Reply via email to