Added Kafka Avro serializer with unit tests. Added Kafka task with unit test


Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/68c341bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/68c341bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/68c341bd

Branch: refs/heads/master
Commit: 68c341bddee77a8d9f873b18a4661482dc4b190f
Parents: 2a4bec9
Author: Jakub Jankowski <[email protected]>
Authored: Fri Apr 28 17:08:29 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../samoa/streams/kafka/KafkaAvroMapper.java    | 158 +++++++++++++++++++
 .../apache/samoa/streams/kafka/KafkaTask.java   | 148 +++++++++++++++++
 .../kafka/topology/SimpleComponentFactory.java  |  53 +++++++
 .../streams/kafka/topology/SimpleEngine.java    |  37 +++++
 .../topology/SimpleEntranceProcessingItem.java  |  33 ++++
 .../kafka/topology/SimpleProcessingItem.java    |  87 ++++++++++
 .../streams/kafka/topology/SimpleStream.java    |  95 +++++++++++
 .../streams/kafka/topology/SimpleTopology.java  |  46 ++++++
 samoa-api/src/main/resources/kafka.avsc         |  61 +++++++
 .../kafka/KafkaEntranceProcessorTest.java       |  74 +++++++--
 .../samoa/streams/kafka/KafkaTaskTest.java      | 138 ++++++++++++++++
 .../samoa/streams/kafka/KafkaUtilsTest.java     |  24 +--
 .../samoa/streams/kafka/TestUtilsForKafka.java  |  25 ++-
 13 files changed, 952 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
new file mode 100644
index 0000000..91902d0
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Sample class for serializing and deserializing {@link InstanceContentEvent}
+ * from/to Avro format
+ * 
+ * @author Jakub Jankowski
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaAvroMapper implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> {
+
+       private static Logger logger = 
LoggerFactory.getLogger(KafkaAvroMapper.class);
+
+       @Override
+       public byte[] serialize(InstanceContentEvent message) {
+               return toBytesGeneric(InstanceContentEvent.class, message);
+       }
+
+       @Override
+       public InstanceContentEvent deserialize(byte[] message) {
+               return avroDeserialize(message, InstanceContentEvent.class, 
null);
+       }
+
+       public static <T> byte[] avroSerialize(Class<T> clazz, Object object) {
+               byte[] ret = null;
+               try {
+                       if (object == null || !(object instanceof 
SpecificRecord)) {
+                               return null;
+                       }
+
+                       T record = (T) object;
+                       ByteArrayOutputStream out = new ByteArrayOutputStream();
+                       Encoder e = 
EncoderFactory.get().directBinaryEncoder(out, null);
+                       SpecificDatumWriter<T> w = new 
SpecificDatumWriter<T>(clazz);
+                       w.write(record, e);
+                       e.flush();
+                       ret = out.toByteArray();
+               } catch (IOException e) {
+
+               }
+
+               return ret;
+       }
+
+       public static <T> T avroDeserialize(byte[] avroBytes, Class<T> clazz, 
Schema schema) {
+               T ret = null;
+               try {
+                       ByteArrayInputStream in = new 
ByteArrayInputStream(avroBytes);
+                       Decoder d = 
DecoderFactory.get().directBinaryDecoder(in, null);
+                       SpecificDatumReader<T> reader = new 
SpecificDatumReader<T>(clazz);
+                       ret = reader.read(null, d);
+               } catch (IOException e) {
+
+               }
+
+               return ret;
+       }
+
+       public static <V> byte[] toBytesGeneric(final Class<V> cls, final V v) {
+               final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+               final Schema schema = 
ReflectData.AllowNull.get().getSchema(cls);
+               final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema);
+               final BinaryEncoder binEncoder = 
EncoderFactory.get().binaryEncoder(bout, null);
+               try {
+                       writer.write(v, binEncoder);
+                       binEncoder.flush();
+               } catch (final Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               return bout.toByteArray();
+       }
+
+       public static <V> byte[] avroBurrSerialize(final Class<V> cls, final V 
v) {
+               ByteArrayOutputStream bout = new ByteArrayOutputStream();
+               try {
+                       Schema schema = new Schema.Parser().parse(new 
File("C:/java/avro/kafka.avsc"));
+                       DatumWriter<V> writer;
+
+                       if (v instanceof SpecificRecord) {
+                               writer = new SpecificDatumWriter<>(schema);
+                       } else {
+                               writer = new ReflectDatumWriter<>(schema);
+                       }
+                       
+                       BinaryEncoder binEncoder = 
EncoderFactory.get().binaryEncoder(bout, null);
+                       writer.write(v, binEncoder);
+            binEncoder.flush();
+            
+               } catch (IOException e) {
+                       e.printStackTrace();
+               } catch (final Exception e) {
+                       throw new RuntimeException(e);
+               }
+               
+               return bout.toByteArray();
+               
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
new file mode 100644
index 0000000..26012f2
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Properties;
+
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+/**
+ * Kafka task
+ * 
+ * @author Jakub Jankowski
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ *
+ */
+
+public class KafkaTask implements Task, Configurable {
+
+       private static final long serialVersionUID = 3984474041982397855L;
+       private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
+       
+       //czy identyczne dla enterance i destination?
+       Properties producerProps;
+       Properties consumerProps;
+       int timeout;
+       private final KafkaDeserializer deserializer;
+       private final KafkaSerializer serializer;
+       private final String topic;
+
+       private TopologyBuilder builder;
+       private Topology kafkaTopology;
+
+       public IntOption kafkaParallelismOption = new 
IntOption("parallelismOption", 'p',
+                       "Number of destination Processors", 1, 1, 
Integer.MAX_VALUE);
+
+       public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
+                       "KafkaTask" + new 
SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+       /**
+     * Class constructor
+     * @param props Properties of Kafka Producer and Consumer
+     * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
configuration</a>
+     * @see <a 
href="http://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
configuration</a>
+     * @param topic Topic to which destination processor will write into
+     * @param timeout Timeout used when polling Kafka for new messages
+     * @param serializer Implementation of KafkaSerializer that handles 
arriving data serialization
+     * @param serializer Implementation of KafkaDeserializer that handles 
arriving data deserialization
+     */
+       public KafkaTask(Properties producerProps, Properties consumerProps, 
String topic, int timeout, KafkaSerializer serializer, KafkaDeserializer 
deserializer) {
+               this.producerProps = producerProps;
+               this.consumerProps = consumerProps;
+               this.deserializer = deserializer;
+               this.serializer = serializer;
+               this.topic = topic;
+               this.timeout = timeout;
+       }
+
+       @Override
+       public void init() {
+               logger.info("Invoking init");
+               if (builder == null) {
+                       builder = new TopologyBuilder();
+                       logger.info("Successfully instantiating 
TopologyBuilder");
+
+                       builder.initTopology(evaluationNameOption.getValue());
+                       logger.info("Successfully initializing SAMOA topology 
with name {}", evaluationNameOption.getValue());
+               }
+               
+               // create enterance processor
+               KafkaEntranceProcessor sourceProcessor = new 
KafkaEntranceProcessor(consumerProps, topic, timeout, deserializer);
+               builder.addEntranceProcessor(sourceProcessor);
+               
+               // create stream
+               Stream stream = builder.createStream(sourceProcessor);
+               
+               // create destination processor
+               KafkaDestinationProcessor destProcessor = new 
KafkaDestinationProcessor(producerProps, topic, serializer);
+               builder.addProcessor(destProcessor, 
kafkaParallelismOption.getValue());
+               builder.connectInputShuffleStream(stream, destProcessor);
+               
+               // build topology
+               kafkaTopology = builder.build();
+           logger.info("Successfully built the topology");
+       }
+
+       @Override
+       public Topology getTopology() {
+               return kafkaTopology;
+       }
+
+       @Override
+       public void setFactory(ComponentFactory factory) {
+               logger.info("Invoking setFactory: "+factory.toString());
+               builder = new TopologyBuilder(factory);
+           logger.info("Successfully instantiating TopologyBuilder");
+
+           builder.initTopology(evaluationNameOption.getValue());
+           logger.info("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
new file mode 100644
index 0000000..155ce1f
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleComponentFactory.java
@@ -0,0 +1,53 @@
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+public class SimpleComponentFactory implements ComponentFactory {
+
+  public ProcessingItem createPi(Processor processor, int paralellism) {
+    return new SimpleProcessingItem(processor, paralellism);
+  }
+
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
+
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+    return new SimpleEntranceProcessingItem(processor);
+  }
+
+  public Stream createStream(IProcessingItem sourcePi) {
+    return new SimpleStream(sourcePi);
+  }
+
+  public Topology createTopology(String topoName) {
+    return new SimpleTopology(topoName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
new file mode 100644
index 0000000..d446018
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEngine.java
@@ -0,0 +1,37 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.Topology;
+
+public class SimpleEngine {
+
+  public static void submitTopology(Topology topology) {
+    SimpleTopology simpleTopology = (SimpleTopology) topology;
+    simpleTopology.run();
+    // runs until completion
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
new file mode 100644
index 0000000..4c626dc
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleEntranceProcessingItem.java
@@ -0,0 +1,33 @@
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.LocalEntranceProcessingItem;
+
+class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
+  public SimpleEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  // The default waiting time when there is no available events is 100ms
+  // Override waitForNewEvents() to change it
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
new file mode 100644
index 0000000..3549b85
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleProcessingItem.java
@@ -0,0 +1,87 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ * 
+ * @author abifet
+ */
+class SimpleProcessingItem extends AbstractProcessingItem {
+  private IProcessingItem[] arrayProcessingItem;
+
+  SimpleProcessingItem(Processor processor) {
+    super(processor);
+  }
+
+  SimpleProcessingItem(Processor processor, int parallelism) {
+    super(processor);
+    this.setParallelism(parallelism);
+  }
+
+  public IProcessingItem getProcessingItem(int i) {
+    return arrayProcessingItem[i];
+  }
+
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
+    ((SimpleStream) inputStream).addDestination(destination);
+    return this;
+  }
+
+  public SimpleProcessingItem copy() {
+    Processor processor = this.getProcessor();
+    return new SimpleProcessingItem(processor.newProcessor(processor));
+  }
+
+  public void processEvent(ContentEvent event, int counter) {
+
+    int parallelism = this.getParallelism();
+    // System.out.println("Process event "+event+" 
(isLast="+event.isLastEvent()+") with counter="+counter+" while 
parallelism="+parallelism);
+    if (this.arrayProcessingItem == null && parallelism > 0) {
+      // Init processing elements, the first time they are needed
+      this.arrayProcessingItem = new IProcessingItem[parallelism];
+      for (int j = 0; j < parallelism; j++) {
+        arrayProcessingItem[j] = this.copy();
+        arrayProcessingItem[j].getProcessor().onCreate(j);
+      }
+    }
+    if (this.arrayProcessingItem != null) {
+      IProcessingItem pi = this.getProcessingItem(counter);
+      Processor p = pi.getProcessor();
+      // System.out.println("PI="+pi+", p="+p);
+      this.getProcessingItem(counter).getProcessor().process(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
new file mode 100644
index 0000000..269e0cc
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleStream.java
@@ -0,0 +1,95 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ * 
+ * @author abifet
+ */
+class SimpleStream extends AbstractStream {
+  private List<StreamDestination> destinations;
+  private int maxCounter;
+  private int eventCounter;
+
+  SimpleStream(IProcessingItem sourcePi) {
+    super(sourcePi);
+    this.destinations = new LinkedList<>();
+    this.eventCounter = 0;
+    this.maxCounter = 1;
+  }
+
+  private int getNextCounter() {
+    if (maxCounter > 0 && eventCounter >= maxCounter)
+      eventCounter = 0;
+    this.eventCounter++;
+    return this.eventCounter;
+  }
+
+  @Override
+  public void put(ContentEvent event) {
+    this.put(event, this.getNextCounter());
+  }
+
+  private void put(ContentEvent event, int counter) {
+    SimpleProcessingItem pi;
+    int parallelism;
+    for (StreamDestination destination : destinations) {
+      pi = (SimpleProcessingItem) destination.getProcessingItem();
+      parallelism = destination.getParallelism();
+      switch (destination.getPartitioningScheme()) {
+      case SHUFFLE:
+        pi.processEvent(event, counter % parallelism);
+        break;
+      case GROUP_BY_KEY:
+        HashCodeBuilder hb = new HashCodeBuilder();
+        hb.append(event.getKey());
+        int key = hb.build() % parallelism;
+        pi.processEvent(event, key);
+        break;
+      case BROADCAST:
+        for (int p = 0; p < parallelism; p++) {
+          pi.processEvent(event, p);
+        }
+        break;
+      }
+    }
+  }
+
+  public void addDestination(StreamDestination destination) {
+    this.destinations.add(destination);
+    if (maxCounter <= 0)
+      maxCounter = 1;
+    maxCounter *= destination.getParallelism();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
new file mode 100644
index 0000000..98dd7a5
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/topology/SimpleTopology.java
@@ -0,0 +1,46 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.streams.kafka.topology;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.AbstractTopology;
+
+public class SimpleTopology extends AbstractTopology {
+  SimpleTopology(String name) {
+    super(name);
+  }
+
+  public void run() {
+    if (this.getEntranceProcessingItems() == null)
+      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
+    if (this.getEntranceProcessingItems().size() != 1)
+      throw new IllegalStateException("SimpleTopology supports 1 entrance PI 
only. Number of entrance PIs is "
+          + this.getEntranceProcessingItems().size());
+
+    SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) 
this.getEntranceProcessingItems()
+        .toArray()[0];
+    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple 
mode
+    entrancePi.startSendingEvents();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/main/resources/kafka.avsc
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/resources/kafka.avsc 
b/samoa-api/src/main/resources/kafka.avsc
new file mode 100644
index 0000000..c21e153
--- /dev/null
+++ b/samoa-api/src/main/resources/kafka.avsc
@@ -0,0 +1,61 @@
+[
+{
+  "type": "record",
+  "name": "InstanceData",
+  "fields": [
+       ]
+},
+{
+  "type": "record",
+  "name": "SingleClassInstanceData",
+  "fields": [
+               {"name":"classValue", "type": "double"}
+       ]
+},
+{
+  "type": "record",
+  "name": "DenseInstanceData",
+  "fields": [
+               {"name":"attributeValues", "type": {"type": "array", "items": 
"double"}}
+       ]
+},
+{
+  "type": "record",
+  "name": "SparseInstanceData",
+  "fields": [
+               {"name":"attributeValues", "type": {"type": "array", "items": 
"double"}},
+               {"name":"indexValues", "type": {"type": "array", "items": 
"int"}},
+               {"name":"numberAttributes", "type": "int"}
+       ]
+},
+{
+  "type": "record",
+  "name": "SerializableInstance",
+  "fields": [
+               {"name": "weight", "type": "double"},
+               {"name": "instanceData", "type": ["null", "InstanceData", 
"DenseInstanceData", "SparseInstanceData", "SingleClassInstanceData"]},
+               {"name": "classData", "type": "InstanceData"}
+       ]
+},
+{
+  "type": "record",
+  "name": "InstanceContent",
+  "fields": [
+               {"name": "instanceIndex", "type": "long"},
+               {"name": "classifierIndex", "type": "int"},
+               {"name": "evaluationIndex", "type": "int"},
+               {"name":"instance", "type":"SerializableInstance"},
+               {"name": "isTraining", "type": "boolean"},
+               {"name": "isTesting", "type": "boolean"},
+               {"name": "isLast", "type": "boolean"}
+       ]
+},
+{
+ "type": "record",
+ "name": "InstanceContentEvent",
+ "fields": [
+     {"name": "instanceContent", "type": "InstanceContent"}
+ ]
+}
+]
+   

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
index 2a92a31..3da9d6f 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -56,6 +56,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import static org.junit.Assert.*;
 import kafka.admin.AdminUtils;
@@ -84,15 +85,16 @@ import org.apache.samoa.streams.InstanceStream;
  *
  * @author pwawrzyniak
  */
+//@Ignore
 public class KafkaEntranceProcessorTest {
 
 //    @Tested
 //    private KafkaEntranceProcessor kep;
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
-    private static final String TOPIC = "test";
-    private static final int NUM_INSTANCES = 500;
+    private static final String ZKHOST = "10.255.251.202";             
//10.255.251.202
+    private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214
+    private static final String BROKERPORT = "6667";           //6667, local: 
9092
+    private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
+    private static final int NUM_INSTANCES = 50;
     
     
     private static KafkaServer kafkaServer;
@@ -108,28 +110,28 @@ public class KafkaEntranceProcessorTest {
     public static void setUpClass() throws IOException {
         // setup Zookeeper
         zkServer = new EmbeddedZookeeper();
-        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
         zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
         ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
 
         // setup Broker
-        Properties brokerProps = new Properties();
+        /*Properties brokerProps = new Properties();
         brokerProps.setProperty("zookeeper.connect", zkConnect);
         brokerProps.setProperty("broker.id", "0");
         brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
         brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
         KafkaConfig config = new KafkaConfig(brokerProps);
         Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
+        kafkaServer = TestUtils.createServer(config, mock);*/
 
         // create topic
-        AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
 
     }
 
     @AfterClass
     public static void tearDownClass() {
-        kafkaServer.shutdown();
+        //kafkaServer.shutdown();
         zkClient.close();
         zkServer.shutdown();
     }
@@ -144,7 +146,7 @@ public class KafkaEntranceProcessorTest {
 
     }
 
-    @Test
+    /*@Test
     public void testFetchingNewData() throws InterruptedException, 
ExecutionException, TimeoutException {
 
         Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
@@ -187,6 +189,56 @@ public class KafkaEntranceProcessorTest {
         assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);        
       
      
+    }*/
+    
+    @Test
+    public void testFetchingNewDataWithAvro() throws InterruptedException, 
ExecutionException, TimeoutException {
+        Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
+        logger.log(Level.INFO, "AVRO");
+       logger.log(Level.INFO, "testFetchingNewDataWithAvro");
+        Properties props = TestUtilsForKafka.getConsumerProperties();
+        props.setProperty("auto.offset.reset", "earliest");
+        KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC, 
10000, new KafkaAvroMapper());
+        kep.onCreate(1);
+
+//         prepare new thread for data producing
+        Thread th = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(TestUtilsForKafka.getProducerProperties());
+
+                Random r = new Random();
+                InstancesHeader header = TestUtilsForKafka.generateHeader(10);
+                KafkaAvroMapper avroMapper = new KafkaAvroMapper();
+                int i = 0;
+                for (i = 0; i < NUM_INSTANCES; i++) {
+                    try {
+                       //byte[] data = 
avroMapper.serialize(TestUtilsForKafka.getData(r, 10, header));
+                       byte[] data = 
KafkaAvroMapper.avroBurrSerialize(InstanceContentEvent.class, 
TestUtilsForKafka.getData(r, 10, header));
+                       if(data == null)
+                               
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Serialize result: null ("+i+")");
+                        ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, data);
+                        long stat = producer.send(record).get(10, 
TimeUnit.DAYS).offset();
+                        Thread.sleep(5);
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Sent avro message with ID={0} to Kafka!, offset={1}", new Object[]{i, stat});
+                    } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
+                        
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.SEVERE, 
null, ex);
+                    }
+                }
+                producer.flush();
+                producer.close();
+            }
+        });
+        th.start();
+
+        int z = 0;
+        while (kep.hasNext() && z < NUM_INSTANCES) {
+            logger.log(Level.INFO, "{0} {1}", new Object[]{z++, 
kep.nextEvent().toString()});
+        }       
+
+        assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);        
+      
+     
     }
 
 //    private Properties getProducerProperties() {

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
new file mode 100644
index 0000000..31f34fb
--- /dev/null
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaTaskTest.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.common.utils.Time;
+import org.apache.samoa.streams.kafka.topology.SimpleComponentFactory;
+import org.apache.samoa.streams.kafka.topology.SimpleEngine;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+*
+* @author Jakub Jankowski
+*/
+@Ignore
+public class KafkaTaskTest {
+       
+    private static final String ZKHOST = "10.255.251.202";             
//10.255.251.202
+    private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214
+    private static final String BROKERPORT = "6667";           //6667, local: 
9092
+    private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
+    private static final int NUM_INSTANCES = 500;
+    
+    
+    private static KafkaServer kafkaServer;
+    private static EmbeddedZookeeper zkServer;
+    private static ZkClient zkClient;
+    private static String zkConnect;
+    
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        // setup Zookeeper
+        zkServer = new EmbeddedZookeeper();
+        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
+        zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        /*Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);*/
+
+        // create topic
+        //AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+    }
+    
+    @AfterClass
+    public static void tearDownClass() {
+        //kafkaServer.shutdown(); 
+        zkClient.close();
+        zkServer.shutdown();
+    }
+
+    @Before
+    public void setUp() throws IOException {
+
+    }
+
+    @After
+    public void tearDown() {
+
+    }
+    
+    @Test
+    public void testKafkaTask() throws InterruptedException, 
ExecutionException, TimeoutException {
+        Logger logger = Logger.getLogger(KafkaTaskTest.class.getName());
+        logger.log(Level.INFO, "KafkaTask");
+        Properties producerProps = TestUtilsForKafka.getProducerProperties();
+        Properties consumerProps = TestUtilsForKafka.getConsumerProperties();
+            
+        KafkaTask task = new KafkaTask(producerProps, consumerProps, 
"kafkaTaskTest", 10000, new KafkaJsonMapper(Charset.defaultCharset()), new 
KafkaJsonMapper(Charset.defaultCharset()));
+        task.setFactory(new SimpleComponentFactory());
+        task.init();
+        SimpleEngine.submitTopology(task.getTopology());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
index 4cd5135..7c1c7c0 100644
--- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
+++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaUtilsTest.java
@@ -72,6 +72,7 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
@@ -79,11 +80,12 @@ import static org.junit.Assert.*;
  *
  * @author pwawrzyniak
  */
+@Ignore
 public class KafkaUtilsTest {
 
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
+    private static final String ZKHOST = "10.255.251.202";             
//10.255.251.202
+    private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214
+    private static final String BROKERPORT = "6667";           //6667, local: 
9092
     private static final String TOPIC_R = "test-r";
     private static final String TOPIC_S = "test-s";
 
@@ -102,29 +104,29 @@ public class KafkaUtilsTest {
     public static void setUpClass() throws IOException {
         // setup Zookeeper
         zkServer = new EmbeddedZookeeper();
-        zkConnect = ZKHOST + ":" + zkServer.port();
+        zkConnect = ZKHOST + ":" + "2181"; //+ zkServer.port();
         zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
         ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
 
         // setup Broker
-        Properties brokerProps = new Properties();
+        /*Properties brokerProps = new Properties();
         brokerProps.setProperty("zookeeper.connect", zkConnect);
         brokerProps.setProperty("broker.id", "0");
         brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafkaUtils-").toAbsolutePath().toString());
         brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" 
+ BROKERPORT);
         KafkaConfig config = new KafkaConfig(brokerProps);
         Time mock = new MockTime();
-        kafkaServer = TestUtils.createServer(config, mock);
+        kafkaServer = TestUtils.createServer(config, mock);*/
 
         // create topics
-        AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
-        AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        //AdminUtils.createTopic(zkUtils, TOPIC_R, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+        //AdminUtils.createTopic(zkUtils, TOPIC_S, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
 
     }
 
     @AfterClass
     public static void tearDownClass() {
-        kafkaServer.shutdown();
+        //kafkaServer.shutdown();
         zkClient.close();
         zkServer.shutdown();
     }
@@ -167,7 +169,7 @@ public class KafkaUtilsTest {
         instance.initializeConsumer(topics);
 
         logger.log(Level.INFO, "Produce data");
-        List expResult = sendAndGetMessages(500);
+        List expResult = sendAndGetMessages(50);
 
         logger.log(Level.INFO, "Get results from Kafka");
         List<byte[]> result = instance.getKafkaMessages();
@@ -214,7 +216,7 @@ public class KafkaUtilsTest {
         Random r = new Random();
         InstancesHeader header = TestUtilsForKafka.generateHeader(10);
         Gson gson = new Gson();
-        for (int i = 0; i < 500; i++) {
+        for (int i = 0; i < 50; i++) {
             byte[] val = gson.toJson(TestUtilsForKafka.getData(r, 10, 
header)).getBytes();
             sent.add(val);
             instance.sendKafkaMessage(TOPIC_S, val);

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/68c341bd/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
index 0d30429..8d85fd7 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/TestUtilsForKafka.java
@@ -52,10 +52,10 @@ import org.apache.samoa.moa.core.FastVector;
  */
 public class TestUtilsForKafka {
 
-    private static final String ZKHOST = "127.0.0.1";
-    private static final String BROKERHOST = "127.0.0.1";
-    private static final String BROKERPORT = "9092";
-    private static final String TOPIC = "test";
+    private static final String ZKHOST = "10.255.251.202";             
//10.255.251.202
+    private static final String BROKERHOST = "10.255.251.214"; //10.255.251.214
+    private static final String BROKERPORT = "6667";           //6667, local: 
9092
+    private static final String TOPIC = "samoa_test";                          
//samoa_test, local: test
 
     protected static InstanceContentEvent getData(Random instanceRandom, int 
numAtts, InstancesHeader header) {
         double[] attVals = new double[numAtts + 1];
@@ -126,7 +126,22 @@ public class TestUtilsForKafka {
         consumerProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
         consumerProps.setProperty("group.id", "test");
         consumerProps.setProperty("auto.offset.reset", "earliest");
-//        consumerProps.setProperty("client.id", "consumer0");
+        //consumerProps.setProperty("client.id", "consumer0");
         return consumerProps;
     }
+    
+    protected static Properties getConsumerProducerProperties() {
+        Properties props = new Properties();
+        props.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+        props.put("enable.auto.commit", "true");
+        props.put("auto.commit.interval.ms", "1000");
+        props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
+        props.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        props.setProperty("group.id", "burrito");
+        props.setProperty("auto.offset.reset", "earliest");
+        props.setProperty("client.id", "burrito");
+        return props;
+    }
 }

Reply via email to