http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEngine.java 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEngine.java
new file mode 100644
index 0000000..9c30036
--- /dev/null
+++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEngine.java
@@ -0,0 +1,194 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream;
+import org.apache.samoa.utils.SamzaConfigFactory;
+import org.apache.samoa.utils.SystemsUtils;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.JobRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class will submit a list of Samza jobs with the Configs generated from 
the input topology
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class SamzaEngine {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SamzaEngine.class);
+
+  /*
+   * Singleton instance
+   */
+  private static SamzaEngine engine = new SamzaEngine();
+
+  private String zookeeper;
+  private String kafka;
+  private int kafkaReplicationFactor;
+  private boolean isLocalMode;
+  private String yarnPackagePath;
+  private String yarnConfHome;
+
+  private String kryoRegisterFile;
+
+  private int amMem;
+  private int containerMem;
+  private int piPerContainerRatio;
+
+  private int checkpointFrequency;
+
+  private void _submitTopology(SamzaTopology topology) {
+
+    // Setup SamzaConfigFactory
+    SamzaConfigFactory configFactory = new SamzaConfigFactory();
+    configFactory.setLocalMode(isLocalMode)
+        .setZookeeper(zookeeper)
+        .setKafka(kafka)
+        .setYarnPackage(yarnPackagePath)
+        .setAMMemory(amMem)
+        .setContainerMemory(containerMem)
+        .setPiPerContainerRatio(piPerContainerRatio)
+        .setKryoRegisterFile(kryoRegisterFile)
+        .setCheckpointFrequency(checkpointFrequency)
+        .setReplicationFactor(kafkaReplicationFactor);
+
+    // Generate the list of Configs
+    List<MapConfig> configs;
+    try {
+      // ConfigFactory generate a list of configs
+      // Serialize a map of PIs and store in a file in the jar at jarFilePath
+      // (in dat/ folder)
+      configs = configFactory.getMapConfigsForTopology(topology);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
+
+    // Create kafka streams
+    Set<Stream> streams = topology.getStreams();
+    for (Stream stream : streams) {
+      SamzaStream samzaStream = (SamzaStream) stream;
+      List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams();
+      for (SamzaSystemStream systemStream : systemStreams) {
+        // all streams should be kafka streams
+        SystemsUtils.createKafkaTopic(systemStream.getStream(), 
systemStream.getParallelism(), kafkaReplicationFactor);
+      }
+    }
+
+    // Submit the jobs with those configs
+    for (MapConfig config : configs) {
+      logger.info("Config:{}", config);
+      JobRunner jobRunner = new JobRunner(config);
+      jobRunner.run();
+    }
+  }
+
+  private void _setupSystemsUtils() {
+    // Setup Utils
+    if (!isLocalMode)
+      SystemsUtils.setHadoopConfigHome(yarnConfHome);
+    SystemsUtils.setZookeeper(zookeeper);
+  }
+
+  /*
+   * Setter methods
+   */
+  public static SamzaEngine getEngine() {
+    return engine;
+  }
+
+  public SamzaEngine setZooKeeper(String zk) {
+    this.zookeeper = zk;
+    return this;
+  }
+
+  public SamzaEngine setKafka(String kafka) {
+    this.kafka = kafka;
+    return this;
+  }
+
+  public SamzaEngine setKafkaReplicationFactor(int replicationFactor) {
+    this.kafkaReplicationFactor = replicationFactor;
+    return this;
+  }
+
+  public SamzaEngine setCheckpointFrequency(int freq) {
+    this.checkpointFrequency = freq;
+    return this;
+  }
+
+  public SamzaEngine setLocalMode(boolean isLocal) {
+    this.isLocalMode = isLocal;
+    return this;
+  }
+
+  public SamzaEngine setYarnPackage(String yarnPackagePath) {
+    this.yarnPackagePath = yarnPackagePath;
+    return this;
+  }
+
+  public SamzaEngine setConfigHome(String configHome) {
+    this.yarnConfHome = configHome;
+    return this;
+  }
+
+  public SamzaEngine setAMMemory(int mem) {
+    this.amMem = mem;
+    return this;
+  }
+
+  public SamzaEngine setContainerMemory(int mem) {
+    this.containerMem = mem;
+    return this;
+  }
+
+  public SamzaEngine setPiPerContainerRatio(int piPerContainer) {
+    this.piPerContainerRatio = piPerContainer;
+    return this;
+  }
+
+  public SamzaEngine setKryoRegisterFile(String registerFile) {
+    this.kryoRegisterFile = registerFile;
+    return this;
+  }
+
+  /**
+   * Submit a list of Samza jobs correspond to the submitted topology
+   * 
+   * @param topo
+   *          the submitted topology
+   */
+  public static void submitTopology(SamzaTopology topo) {
+    // Setup SystemsUtils
+    engine._setupSystemsUtils();
+
+    // Submit topology
+    engine._submitTopology(topo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEntranceProcessingItem.java
 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEntranceProcessingItem.java
new file mode 100644
index 0000000..5f00b79
--- /dev/null
+++ 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaEntranceProcessingItem.java
@@ -0,0 +1,228 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.AbstractEntranceProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.SamzaConfigFactory;
+import org.apache.samoa.utils.SystemsUtils;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * EntranceProcessingItem for Samza which is also a Samza task (StreamTask & 
InitableTask)
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class SamzaEntranceProcessingItem extends AbstractEntranceProcessingItem
+    implements SamzaProcessingNode, Serializable, StreamTask, InitableTask {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = 7157734520046135039L;
+
+  /*
+   * Constructors
+   */
+  public SamzaEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  // Need this so Samza can initialize a StreamTask
+  public SamzaEntranceProcessingItem() {
+  }
+
+  /*
+   * Simple setters, getters
+   */
+  @Override
+  public int addOutputStream(SamzaStream stream) {
+    this.setOutputStream(stream);
+    return 1; // entrance PI should have only 1 output stream
+  }
+
+  /*
+   * Serialization
+   */
+  private Object writeReplace() {
+    return new SerializationProxy(this);
+  }
+
+  private static class SerializationProxy implements Serializable {
+    /**
+                * 
+                */
+    private static final long serialVersionUID = 313907132721414634L;
+
+    private EntranceProcessor processor;
+    private SamzaStream outputStream;
+    private String name;
+
+    public SerializationProxy(SamzaEntranceProcessingItem epi) {
+      this.processor = epi.getProcessor();
+      this.outputStream = (SamzaStream) epi.getOutputStream();
+      this.name = epi.getName();
+    }
+  }
+
+  /*
+   * Implement Samza Task
+   */
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+    if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property 
is set, otherwise, assume we are running in local mode and ignore this
+      SystemsUtils.setHadoopConfigHome(yarnConfHome);
+
+    String filename = config.get(SamzaConfigFactory.FILE_KEY);
+    String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+
+    this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
+    SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
+        filename, this.getName());
+    this.setOutputStream(wrapper.outputStream);
+    SamzaStream output = (SamzaStream) this.getOutputStream();
+    if (output != null) // if output stream exists, set it up
+      output.onCreate();
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
+      throws Exception {
+    SamzaStream output = (SamzaStream) this.getOutputStream();
+    if (output == null)
+      return; // if there is no output stream, do nothing
+    output.setCollector(collector);
+    ContentEvent event = (ContentEvent) envelope.getMessage();
+    output.put(event);
+  }
+
+  /*
+   * Implementation of Samza's SystemConsumer to get events from source and 
feed
+   * to SAMOA system
+   */
+  /*
+   * Current implementation: buffer the incoming events and send a batch of 
them
+   * when poll() is called by Samza system.
+   * 
+   * Currently: it has a "soft" limit on the size of the buffer: when the 
buffer
+   * size reaches the limit, the reading thread will sleep for 100ms. A hard
+   * limit can be achieved by overriding the method protected
+   * BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() of
+   * BlockingEnvelopeMap But then we have handle the case when the queue is
+   * full.
+   */
+  public static class SamoaSystemConsumer extends BlockingEnvelopeMap {
+
+    private EntranceProcessor entranceProcessor = null;
+    private SystemStreamPartition systemStreamPartition;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SamoaSystemConsumer.class);
+
+    public SamoaSystemConsumer(String systemName, Config config) {
+      String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+      if (yarnConfHome != null && yarnConfHome.length() > 0) // if the 
property is set, otherwise, assume we are running in local mode and ignore this
+        SystemsUtils.setHadoopConfigHome(yarnConfHome);
+
+      String filename = config.get(SamzaConfigFactory.FILE_KEY);
+      String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+      String name = config.get(SamzaConfigFactory.JOB_NAME_KEY);
+      SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
+          filename, name);
+
+      this.entranceProcessor = wrapper.processor;
+      this.entranceProcessor.onCreate(0);
+
+      // Internal stream from SystemConsumer to EntranceTask, so we
+      // need only one partition
+      this.systemStreamPartition = new SystemStreamPartition(systemName, 
wrapper.name, new Partition(0));
+    }
+
+    @Override
+    public void start() {
+      Thread processorPollingThread = new Thread(
+          new Runnable() {
+            @Override
+            public void run() {
+              try {
+                pollingEntranceProcessor();
+                setIsAtHead(systemStreamPartition, true);
+              } catch (InterruptedException e) {
+                e.getStackTrace();
+                stop();
+              }
+            }
+          }
+          );
+
+      processorPollingThread.start();
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    private void pollingEntranceProcessor() throws InterruptedException {
+      int messageCnt = 0;
+      while (!this.entranceProcessor.isFinished()) {
+        messageCnt = this.getNumMessagesInQueue(systemStreamPartition);
+        if (this.entranceProcessor.hasNext() && messageCnt < 10000) { // soft
+                                                                      // limit
+                                                                      // on the
+                                                                      // size 
of
+                                                                      // the
+                                                                      // queue
+          this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, null, null,
+              this.entranceProcessor.nextEvent()));
+        } else {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            break;
+          }
+        }
+      }
+
+      // Send last event
+      this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, null, null,
+          this.entranceProcessor.nextEvent()));
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingItem.java
 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingItem.java
new file mode 100644
index 0000000..88f0eaa
--- /dev/null
+++ 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingItem.java
@@ -0,0 +1,166 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.SamzaConfigFactory;
+import org.apache.samoa.utils.StreamDestination;
+import org.apache.samoa.utils.SystemsUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * ProcessingItem for Samza which is also a Samza task (StreamTask and 
InitableTask)
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaProcessingItem extends AbstractProcessingItem
+    implements SamzaProcessingNode, Serializable, StreamTask, InitableTask {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = 1L;
+
+  private Set<SamzaSystemStream> inputStreams; // input streams: system.stream
+  private List<SamzaStream> outputStreams;
+
+  /*
+   * Constructors
+   */
+  // Need this so Samza can initialize a StreamTask
+  public SamzaProcessingItem() {
+  }
+
+  /*
+   * Implement org.apache.samoa.topology.ProcessingItem
+   */
+  public SamzaProcessingItem(Processor processor, int parallelismHint) {
+    super(processor, parallelismHint);
+    this.inputStreams = new HashSet<SamzaSystemStream>();
+    this.outputStreams = new LinkedList<SamzaStream>();
+  }
+
+  /*
+   * Simple setters, getters
+   */
+  public Set<SamzaSystemStream> getInputStreams() {
+    return this.inputStreams;
+  }
+
+  /*
+   * Extends AbstractProcessingItem
+   */
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new 
StreamDestination(this, this
+        .getParallelism(), scheme));
+    this.inputStreams.add(stream);
+    return this;
+  }
+
+  /*
+   * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode
+   */
+  @Override
+  public int addOutputStream(SamzaStream stream) {
+    this.outputStreams.add(stream);
+    return this.outputStreams.size();
+  }
+
+  public List<SamzaStream> getOutputStreams() {
+    return this.outputStreams;
+  }
+
+  /*
+   * Implement Samza task
+   */
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+    if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property 
is set, otherwise, assume we are running in local mode and ignore this          
                                                  // set , otherwise,
+      SystemsUtils.setHadoopConfigHome(yarnConfHome);
+
+    String filename = config.get(SamzaConfigFactory.FILE_KEY);
+    String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+    this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
+    SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
+        filename, this.getName());
+    this.setProcessor(wrapper.processor);
+    this.outputStreams = wrapper.outputStreams;
+
+    // Init Processor and Streams
+    this.getProcessor().onCreate(0);
+    for (SamzaStream stream : this.outputStreams) {
+      stream.onCreate();
+    }
+
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
+      throws Exception {
+    for (SamzaStream stream : this.outputStreams) {
+      stream.setCollector(collector);
+    }
+    this.getProcessor().process((ContentEvent) envelope.getMessage());
+  }
+
+  /*
+   * SerializationProxy
+   */
+  private Object writeReplace() {
+    return new SerializationProxy(this);
+  }
+
+  private static class SerializationProxy implements Serializable {
+    /**
+                * 
+                */
+    private static final long serialVersionUID = 1534643987559070336L;
+
+    private Processor processor;
+    private List<SamzaStream> outputStreams;
+
+    public SerializationProxy(SamzaProcessingItem pi) {
+      this.processor = pi.getProcessor();
+      this.outputStreams = pi.getOutputStreams();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingNode.java
 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingNode.java
new file mode 100644
index 0000000..3d221f1
--- /dev/null
+++ 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaProcessingNode.java
@@ -0,0 +1,58 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.IProcessingItem;
+
+/**
+ * Common interface of SamzaEntranceProcessingItem and SamzaProcessingItem
+ * 
+ * @author Anh Thu Vu
+ */
+public interface SamzaProcessingNode extends IProcessingItem {
+  /**
+   * Registers an output stream with this processing item
+   * 
+   * @param stream
+   *          the output stream
+   * @return the number of output streams of this processing item
+   */
+  public int addOutputStream(SamzaStream stream);
+
+  /**
+   * Gets the name/id of this processing item
+   * 
+   * @return the name/id of this processing item
+   */
+  // TODO: include getName() and setName() in IProcessingItem and/or
+  // AbstractEPI/PI
+  public String getName();
+
+  /**
+   * Sets the name/id for this processing item
+   * 
+   * @param name
+   *          the name/id of this processing item
+   */
+  // TODO: include getName() and setName() in IProcessingItem and/or
+  // AbstractEPI/PI
+  public void setName(String name);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaStream.java 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaStream.java
new file mode 100644
index 0000000..da3f8bc
--- /dev/null
+++ b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaStream.java
@@ -0,0 +1,245 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+
+/**
+ * Stream for SAMOA on Samza
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaStream extends AbstractStream implements Serializable {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = 1L;
+
+  private static final String DEFAULT_SYSTEM_NAME = "kafka";
+
+  private List<SamzaSystemStream> systemStreams;
+  private transient MessageCollector collector;
+  private String systemName;
+
+  /*
+   * Constructor
+   */
+  public SamzaStream(IProcessingItem sourcePi) {
+    super(sourcePi);
+    this.systemName = DEFAULT_SYSTEM_NAME;
+    // Get name/id for this stream
+    SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi;
+    int index = samzaPi.addOutputStream(this);
+    this.setStreamId(samzaPi.getName() + "-" + Integer.toString(index));
+    // init list of SamzaSystemStream
+    systemStreams = new ArrayList<SamzaSystemStream>();
+  }
+
+  /*
+   * System name (Kafka)
+   */
+  public void setSystemName(String systemName) {
+    this.systemName = systemName;
+    for (SamzaSystemStream systemStream : systemStreams) {
+      systemStream.setSystem(systemName);
+    }
+  }
+
+  public String getSystemName() {
+    return this.systemName;
+  }
+
+  /*
+   * Add the PI to the list of destinations. Return the name of the
+   * corresponding SystemStream.
+   */
+  public SamzaSystemStream addDestination(StreamDestination destination) {
+    PartitioningScheme scheme = destination.getPartitioningScheme();
+    int parallelism = destination.getParallelism();
+
+    SamzaSystemStream resultStream = null;
+    for (int i = 0; i < systemStreams.size(); i++) {
+      // There is an existing SystemStream that matches the settings.
+      // Do not create a new one
+      if (systemStreams.get(i).isSame(scheme, parallelism)) {
+        resultStream = systemStreams.get(i);
+      }
+    }
+
+    // No existing SystemStream match the requirement
+    // Create a new one
+    if (resultStream == null) {
+      String topicName = this.getStreamId() + "-" + 
Integer.toString(systemStreams.size());
+      resultStream = new SamzaSystemStream(this.systemName, topicName, scheme, 
parallelism);
+      systemStreams.add(resultStream);
+    }
+
+    return resultStream;
+  }
+
+  public void setCollector(MessageCollector collector) {
+    this.collector = collector;
+  }
+
+  public MessageCollector getCollector() {
+    return this.collector;
+  }
+
+  public void onCreate() {
+    for (SamzaSystemStream stream : systemStreams) {
+      stream.initSystemStream();
+    }
+  }
+
+  /*
+   * Implement Stream interface
+   */
+  @Override
+  public void put(ContentEvent event) {
+    for (SamzaSystemStream stream : systemStreams) {
+      stream.send(collector, event);
+    }
+  }
+
+  public List<SamzaSystemStream> getSystemStreams() {
+    return this.systemStreams;
+  }
+
+  /**
+   * SamzaSystemStream wrap around a Samza's SystemStream It contains the info 
to create a Samza stream during the
+   * constructing process of the topology and will create the actual Samza 
stream when the topology is submitted
+   * (invoking initSystemStream())
+   * 
+   * @author Anh Thu Vu
+   */
+  public static class SamzaSystemStream implements Serializable {
+    /**
+                * 
+                */
+    private static final long serialVersionUID = 1L;
+    private String system;
+    private String stream;
+    private PartitioningScheme scheme;
+    private int parallelism;
+
+    private transient SystemStream actualSystemStream = null;
+
+    /*
+     * Constructors
+     */
+    public SamzaSystemStream(String system, String stream, PartitioningScheme 
scheme, int parallelism) {
+      this.system = system;
+      this.stream = stream;
+      this.scheme = scheme;
+      this.parallelism = parallelism;
+    }
+
+    public SamzaSystemStream(String system, String stream, PartitioningScheme 
scheme) {
+      this(system, stream, scheme, 1);
+    }
+
+    /*
+     * Setters
+     */
+    public void setSystem(String system) {
+      this.system = system;
+    }
+
+    /*
+     * Getters
+     */
+    public String getSystem() {
+      return this.system;
+    }
+
+    public String getStream() {
+      return this.stream;
+    }
+
+    public PartitioningScheme getPartitioningScheme() {
+      return this.scheme;
+    }
+
+    public int getParallelism() {
+      return this.parallelism;
+    }
+
+    public boolean isSame(PartitioningScheme scheme, int parallelismHint) {
+      return (this.scheme == scheme && this.parallelism == parallelismHint);
+    }
+
+    /*
+     * Init the actual Samza stream
+     */
+    public void initSystemStream() {
+      actualSystemStream = new SystemStream(this.system, this.stream);
+    }
+
+    /*
+     * Send a ContentEvent
+     */
+    public void send(MessageCollector collector, ContentEvent contentEvent) {
+      if (actualSystemStream == null)
+        this.initSystemStream();
+
+      switch (this.scheme) {
+      case SHUFFLE:
+        this.sendShuffle(collector, contentEvent);
+        break;
+      case GROUP_BY_KEY:
+        this.sendGroupByKey(collector, contentEvent);
+        break;
+      case BROADCAST:
+        this.sendBroadcast(collector, contentEvent);
+        break;
+      }
+    }
+
+    /*
+     * Helpers
+     */
+    private synchronized void sendShuffle(MessageCollector collector, 
ContentEvent event) {
+      collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, 
event));
+    }
+
+    private void sendGroupByKey(MessageCollector collector, ContentEvent 
event) {
+      collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, 
event.getKey(), null, event));
+    }
+
+    private synchronized void sendBroadcast(MessageCollector collector, 
ContentEvent event) {
+      for (int i = 0; i < parallelism; i++) {
+        collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, 
null, event));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaTopology.java 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaTopology.java
new file mode 100644
index 0000000..ae74619
--- /dev/null
+++ 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaTopology.java
@@ -0,0 +1,64 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.samoa.topology.AbstractTopology;
+import org.apache.samoa.topology.IProcessingItem;
+
+/**
+ * Topology for Samza
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaTopology extends AbstractTopology {
+  private int procItemCounter;
+
+  public SamzaTopology(String topoName) {
+    super(topoName);
+    procItemCounter = 0;
+  }
+
+  @Override
+  public void addProcessingItem(IProcessingItem procItem, int parallelism) {
+    super.addProcessingItem(procItem, parallelism);
+    SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem;
+    samzaPi.setName(this.getTopologyName() + "-" + 
Integer.toString(procItemCounter));
+    procItemCounter++;
+  }
+
+  /*
+   * Gets the set of ProcessingItems, excluding EntrancePIs Used by
+   * SamzaConfigFactory as the config for EntrancePIs and normal PIs are
+   * different
+   */
+  public Set<IProcessingItem> getNonEntranceProcessingItems() throws Exception 
{
+    Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>();
+    copiedSet.addAll(this.getProcessingItems());
+    boolean result = copiedSet.removeAll(this.getEntranceProcessingItems());
+    if (!result) {
+      throw new Exception("Failed extracting the set of non-entrance 
processing items");
+    }
+    return copiedSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java 
b/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java
new file mode 100644
index 0000000..0a2efed
--- /dev/null
+++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaConfigFactory.java
@@ -0,0 +1,538 @@
+package org.apache.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.impl.SamoaSystemFactory;
+import org.apache.samoa.topology.impl.SamzaEntranceProcessingItem;
+import org.apache.samoa.topology.impl.SamzaProcessingItem;
+import org.apache.samoa.topology.impl.SamzaStream;
+import org.apache.samoa.topology.impl.SamzaTopology;
+import org.apache.samoa.topology.impl.SamzaStream.SamzaSystemStream;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.local.LocalJobFactory;
+import org.apache.samza.job.yarn.YarnJobFactory;
+import org.apache.samza.system.kafka.KafkaSystemFactory;
+
+/**
+ * Generate Configs that will be used to submit Samza jobs from the input 
topology (one config per PI/EntrancePI in the
+ * topology)
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class SamzaConfigFactory {
+  public static final String SYSTEM_NAME = "samoa";
+
+  // DEFAULT VALUES
+  private static final String DEFAULT_ZOOKEEPER = "localhost:2181";
+  private static final String DEFAULT_BROKER_LIST = "localhost:9092";
+
+  // DELIMINATORS
+  public static final String COMMA = ",";
+  public static final String COLON = ":";
+  public static final String DOT = ".";
+  public static final char DOLLAR_SIGN = '$';
+  public static final char QUESTION_MARK = '?';
+
+  // PARTITIONING SCHEMES
+  public static final String SHUFFLE = "shuffle";
+  public static final String KEY = "key";
+  public static final String BROADCAST = "broadcast";
+
+  // PROPERTY KEYS
+  // JOB
+  public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class";
+  public static final String JOB_NAME_KEY = "job.name";
+  // YARN
+  public static final String YARN_PACKAGE_KEY = "yarn.package.path";
+  public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb";
+  public static final String AM_MEMORY_KEY = "yarn.am.container.memory.mb";
+  public static final String CONTAINER_COUNT_KEY = "yarn.container.count";
+  // TASK (SAMZA original)
+  public static final String TASK_CLASS_KEY = "task.class";
+  public static final String TASK_INPUTS_KEY = "task.inputs";
+  // TASK (extra)
+  public static final String FILE_KEY = "task.processor.file";
+  public static final String FILESYSTEM_KEY = "task.processor.filesystem";
+  public static final String ENTRANCE_INPUT_KEY = "task.entrance.input";
+  public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs";
+  public static final String YARN_CONF_HOME_KEY = "yarn.config.home";
+  // KAFKA
+  public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect";
+  public static final String BROKER_URI_KEY = "producer.metadata.broker.list";
+  public static final String KAFKA_BATCHSIZE_KEY = 
"producer.batch.num.messages";
+  public static final String KAFKA_PRODUCER_TYPE_KEY = 
"producer.producer.type";
+  // SERDE
+  public static final String SERDE_REGISTRATION_KEY = "kryo.register";
+
+  // Instance variables
+  private boolean isLocalMode;
+  private String zookeeper;
+  private String kafkaBrokerList;
+  private int replicationFactor;
+  private int amMemory;
+  private int containerMemory;
+  private int piPerContainerRatio;
+  private int checkpointFrequency; // in ms
+
+  private String jarPath;
+  private String kryoRegisterFile = null;
+
+  public SamzaConfigFactory() {
+    this.isLocalMode = false;
+    this.zookeeper = DEFAULT_ZOOKEEPER;
+    this.kafkaBrokerList = DEFAULT_BROKER_LIST;
+    this.checkpointFrequency = 60000; // default: 1 minute
+    this.replicationFactor = 1;
+  }
+
+  /*
+   * Setter methods
+   */
+  public SamzaConfigFactory setYarnPackage(String packagePath) {
+    this.jarPath = packagePath;
+    return this;
+  }
+
+  public SamzaConfigFactory setLocalMode(boolean isLocal) {
+    this.isLocalMode = isLocal;
+    return this;
+  }
+
+  public SamzaConfigFactory setZookeeper(String zk) {
+    this.zookeeper = zk;
+    return this;
+  }
+
+  public SamzaConfigFactory setKafka(String brokerList) {
+    this.kafkaBrokerList = brokerList;
+    return this;
+  }
+
+  public SamzaConfigFactory setCheckpointFrequency(int freq) {
+    this.checkpointFrequency = freq;
+    return this;
+  }
+
+  public SamzaConfigFactory setReplicationFactor(int replicationFactor) {
+    this.replicationFactor = replicationFactor;
+    return this;
+  }
+
+  public SamzaConfigFactory setAMMemory(int mem) {
+    this.amMemory = mem;
+    return this;
+  }
+
+  public SamzaConfigFactory setContainerMemory(int mem) {
+    this.containerMemory = mem;
+    return this;
+  }
+
+  public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) {
+    this.piPerContainerRatio = piPerContainer;
+    return this;
+  }
+
+  public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) {
+    this.kryoRegisterFile = kryoRegister;
+    return this;
+  }
+
+  /*
+   * Generate a map of all config properties for the input SamzaProcessingItem
+   */
+  private Map<String, String> getMapForPI(SamzaProcessingItem pi, String 
filename, String filesystem) throws Exception {
+    Map<String, String> map = getBasicSystemConfig();
+
+    // Set job name, task class, task inputs (from SamzaProcessingItem)
+    setJobName(map, pi.getName());
+    setTaskClass(map, SamzaProcessingItem.class.getName());
+
+    StringBuilder streamNames = new StringBuilder();
+    boolean first = true;
+    for (SamzaSystemStream stream : pi.getInputStreams()) {
+      if (!first)
+        streamNames.append(COMMA);
+      streamNames.append(stream.getSystem() + DOT + stream.getStream());
+      if (first)
+        first = false;
+    }
+    setTaskInputs(map, streamNames.toString());
+
+    // Processor file
+    setFileName(map, filename);
+    setFileSystem(map, filesystem);
+
+    List<String> nameList = new ArrayList<String>();
+    // Default kafka system: kafka0: sync producer
+    // This system is always required: it is used for checkpointing
+    nameList.add("kafka0");
+    setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1);
+    // Output streams: set kafka systems
+    for (SamzaStream stream : pi.getOutputStreams()) {
+      boolean found = false;
+      for (String name : nameList) {
+        if (stream.getSystemName().equals(name)) {
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        nameList.add(stream.getSystemName());
+        setKafkaSystem(map, stream.getSystemName(), this.zookeeper, 
this.kafkaBrokerList, stream.getBatchSize());
+      }
+    }
+    // Input streams: set kafka systems
+    for (SamzaSystemStream stream : pi.getInputStreams()) {
+      boolean found = false;
+      for (String name : nameList) {
+        if (stream.getSystem().equals(name)) {
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        nameList.add(stream.getSystem());
+        setKafkaSystem(map, stream.getSystem(), this.zookeeper, 
this.kafkaBrokerList, 1);
+      }
+    }
+
+    // Checkpointing
+    setValue(map, "task.checkpoint.factory", 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+    setValue(map, "task.checkpoint.system", "kafka0");
+    setValue(map, "task.commit.ms", "1000");
+    setValue(map, "task.checkpoint.replication.factor", 
Integer.toString(this.replicationFactor));
+
+    // Number of containers
+    setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio);
+
+    return map;
+  }
+
+  /*
+   * Generate a map of all config properties for the input SamzaProcessingItem
+   */
+  public Map<String, String> getMapForEntrancePI(SamzaEntranceProcessingItem 
epi, String filename, String filesystem) {
+    Map<String, String> map = getBasicSystemConfig();
+
+    // Set job name, task class (from SamzaEntranceProcessingItem)
+    setJobName(map, epi.getName());
+    setTaskClass(map, SamzaEntranceProcessingItem.class.getName());
+
+    // Input for the entrance task (from our custom consumer)
+    setTaskInputs(map, SYSTEM_NAME + "." + epi.getName());
+
+    // Output from entrance task
+    // Since entrancePI should have only 1 output stream
+    // there is no need for checking the batch size, setting different system
+    // names
+    // The custom consumer (samoa system) does not suuport reading from a
+    // specific index
+    // => no need for checkpointing
+    SamzaStream outputStream = (SamzaStream) epi.getOutputStream();
+    // Set samoa system factory
+    setValue(map, "systems." + SYSTEM_NAME + ".samza.factory", 
SamoaSystemFactory.class.getName());
+    // Set Kafka system (only if there is an output stream)
+    if (outputStream != null)
+      setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, 
this.kafkaBrokerList,
+          outputStream.getBatchSize());
+
+    // Processor file
+    setFileName(map, filename);
+    setFileSystem(map, filesystem);
+
+    // Number of containers
+    setNumberOfContainers(map, 1, this.piPerContainerRatio);
+
+    return map;
+  }
+
+  /*
+   * Generate a list of map (of config properties) for all PIs and EPI in the
+   * input topology
+   */
+  public List<Map<String, String>> getMapsForTopology(SamzaTopology topology) 
throws Exception {
+
+    List<Map<String, String>> maps = new ArrayList<Map<String, String>>();
+
+    // File to write serialized objects
+    String filename = topology.getTopologyName() + ".dat";
+    Path dirPath = FileSystems.getDefault().getPath("dat");
+    Path filePath = FileSystems.getDefault().getPath(dirPath.toString(), 
filename);
+    String dstPath = filePath.toString();
+    String resPath;
+    String filesystem;
+    if (this.isLocalMode) {
+      filesystem = SystemsUtils.LOCAL_FS;
+      File dir = dirPath.toFile();
+      if (!dir.exists())
+        FileUtils.forceMkdir(dir);
+    }
+    else {
+      filesystem = SystemsUtils.HDFS;
+    }
+
+    // Correct system name for streams
+    this.setSystemNameForStreams(topology.getStreams());
+
+    // Add all PIs to a collection (map)
+    Map<String, Object> piMap = new HashMap<String, Object>();
+    Set<EntranceProcessingItem> entranceProcessingItems = 
topology.getEntranceProcessingItems();
+    Set<IProcessingItem> processingItems = 
topology.getNonEntranceProcessingItems();
+    for (EntranceProcessingItem epi : entranceProcessingItems) {
+      SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi;
+      piMap.put(sepi.getName(), sepi);
+    }
+    for (IProcessingItem pi : processingItems) {
+      SamzaProcessingItem spi = (SamzaProcessingItem) pi;
+      piMap.put(spi.getName(), spi);
+    }
+
+    // Serialize all PIs
+    boolean serialized = false;
+    if (this.isLocalMode) {
+      serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, 
dstPath);
+      resPath = dstPath;
+    }
+    else {
+      resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath);
+      serialized = resPath != null;
+    }
+
+    if (!serialized) {
+      throw new Exception("Fail serialize map of PIs to file");
+    }
+
+    // MapConfig for all PIs
+    for (EntranceProcessingItem epi : entranceProcessingItems) {
+      SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi;
+      maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem));
+    }
+    for (IProcessingItem pi : processingItems) {
+      SamzaProcessingItem spi = (SamzaProcessingItem) pi;
+      maps.add(this.getMapForPI(spi, resPath, filesystem));
+    }
+
+    return maps;
+  }
+
+  /**
+   * Construct a list of MapConfigs for a Topology
+   * 
+   * @return the list of MapConfigs
+   * @throws Exception
+   */
+  public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) 
throws Exception {
+    List<MapConfig> configs = new ArrayList<MapConfig>();
+    List<Map<String, String>> maps = this.getMapsForTopology(topology);
+    for (Map<String, String> map : maps) {
+      configs.add(new MapConfig(map));
+    }
+    return configs;
+  }
+
+  /*
+   *
+   */
+  public void setSystemNameForStreams(Set<Stream> streams) {
+    Map<Integer, String> batchSizeMap = new HashMap<Integer, String>();
+    batchSizeMap.put(1, "kafka0"); // default system with sync producer
+    int counter = 0;
+    for (Stream stream : streams) {
+      SamzaStream samzaStream = (SamzaStream) stream;
+      String systemName = batchSizeMap.get(samzaStream.getBatchSize());
+      if (systemName == null) {
+        counter++;
+        // Add new system
+        systemName = "kafka" + Integer.toString(counter);
+        batchSizeMap.put(samzaStream.getBatchSize(), systemName);
+      }
+      samzaStream.setSystemName(systemName);
+    }
+
+  }
+
+  /*
+   * Generate a map with common properties for PIs and EPI
+   */
+  private Map<String, String> getBasicSystemConfig() {
+    Map<String, String> map = new HashMap<String, String>();
+    // Job & Task
+    if (this.isLocalMode)
+      map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName());
+    else {
+      map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName());
+
+      // yarn
+      map.put(YARN_PACKAGE_KEY, jarPath);
+      map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory));
+      map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory));
+      map.put(CONTAINER_COUNT_KEY, "1");
+      map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome());
+
+      // Task opts (Heap size = 0.75 container memory)
+      int heapSize = (int) (0.75 * this.containerMemory);
+      map.put("task.opts", "-Xmx" + Integer.toString(heapSize) + "M 
-XX:+PrintGCDateStamps");
+    }
+
+    map.put(JOB_NAME_KEY, "");
+    map.put(TASK_CLASS_KEY, "");
+    map.put(TASK_INPUTS_KEY, "");
+
+    // register serializer
+    map.put("serializers.registry.kryo.class", 
SamzaKryoSerdeFactory.class.getName());
+
+    // Serde registration
+    setKryoRegistration(map, this.kryoRegisterFile);
+
+    return map;
+  }
+
+  /*
+   * Helper methods to set different properties in the input map
+   */
+  private static void setJobName(Map<String, String> map, String jobName) {
+    map.put(JOB_NAME_KEY, jobName);
+  }
+
+  private static void setFileName(Map<String, String> map, String filename) {
+    map.put(FILE_KEY, filename);
+  }
+
+  private static void setFileSystem(Map<String, String> map, String 
filesystem) {
+    map.put(FILESYSTEM_KEY, filesystem);
+  }
+
+  private static void setTaskClass(Map<String, String> map, String taskClass) {
+    map.put(TASK_CLASS_KEY, taskClass);
+  }
+
+  private static void setTaskInputs(Map<String, String> map, String inputs) {
+    map.put(TASK_INPUTS_KEY, inputs);
+  }
+
+  private static void setKryoRegistration(Map<String, String> map, String 
kryoRegisterFile) {
+    if (kryoRegisterFile != null) {
+      String value = readKryoRegistration(kryoRegisterFile);
+      map.put(SERDE_REGISTRATION_KEY, value);
+    }
+  }
+
+  private static void setNumberOfContainers(Map<String, String> map, int 
parallelism, int piPerContainer) {
+    int res = parallelism / piPerContainer;
+    if (parallelism % piPerContainer != 0)
+      res++;
+    map.put(CONTAINER_COUNT_KEY, Integer.toString(res));
+  }
+
+  private static void setKafkaSystem(Map<String, String> map, String 
systemName, String zk, String brokers,
+      int batchSize) {
+    map.put("systems." + systemName + ".samza.factory", 
KafkaSystemFactory.class.getName());
+    map.put("systems." + systemName + ".samza.msg.serde", "kryo");
+
+    map.put("systems." + systemName + "." + ZOOKEEPER_URI_KEY, zk);
+    map.put("systems." + systemName + "." + BROKER_URI_KEY, brokers);
+    map.put("systems." + systemName + "." + KAFKA_BATCHSIZE_KEY, 
Integer.toString(batchSize));
+
+    map.put("systems." + systemName + ".samza.offset.default", "oldest");
+
+    if (batchSize > 1) {
+      map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, 
"async");
+    }
+    else {
+      map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "sync");
+    }
+  }
+
+  // Set custom properties
+  private static void setValue(Map<String, String> map, String key, String 
value) {
+    map.put(key, value);
+  }
+
+  /*
+   * Helper method to parse Kryo registration file
+   */
+  private static String readKryoRegistration(String filePath) {
+    FileInputStream fis = null;
+    Properties props = new Properties();
+    StringBuilder result = new StringBuilder();
+    try {
+      fis = new FileInputStream(filePath);
+      props.load(fis);
+
+      boolean first = true;
+      String value = null;
+      for (String k : props.stringPropertyNames()) {
+        if (!first)
+          result.append(COMMA);
+        else
+          first = false;
+
+        // Need to avoid the dollar sign as samza pass all the properties in
+        // the config to containers via commandline parameters/enviroment
+        // variables
+        // We might escape the dollar sign, but it's more complicated than
+        // replacing it with something else
+        result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
+        value = props.getProperty(k);
+        if (value != null && value.trim().length() > 0) {
+          result.append(COLON);
+          result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
+        }
+      }
+    } catch (FileNotFoundException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } finally {
+      if (fis != null)
+        try {
+          fis.close();
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+    }
+
+    return result.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaKryoSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaKryoSerdeFactory.java 
b/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaKryoSerdeFactory.java
new file mode 100644
index 0000000..90c0f25
--- /dev/null
+++ 
b/samoa-samza/src/main/java/org/apache/samoa/utils/SamzaKryoSerdeFactory.java
@@ -0,0 +1,158 @@
+package org.apache.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Implementation of Samza's SerdeFactory that uses Kryo to 
serialize/deserialize objects
+ * 
+ * @author Anh Thu Vu
+ * @param <T>
+ * 
+ */
+public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SamzaKryoSerdeFactory.class);
+
+  public static class SamzaKryoSerde<V> implements Serde<V> {
+    private Kryo kryo;
+
+    public SamzaKryoSerde(String registrationInfo) {
+      this.kryo = new Kryo();
+      this.register(registrationInfo);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void register(String registrationInfo) {
+      if (registrationInfo == null)
+        return;
+
+      String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA);
+
+      Class targetClass = null;
+      Class serializerClass = null;
+      Serializer serializer = null;
+
+      for (String info : infoList) {
+        String[] fields = info.split(SamzaConfigFactory.COLON);
+
+        targetClass = null;
+        serializerClass = null;
+        if (fields.length > 0) {
+          try {
+            targetClass = 
Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK,
+                SamzaConfigFactory.DOLLAR_SIGN));
+          } catch (ClassNotFoundException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+        }
+        if (fields.length > 1) {
+          try {
+            serializerClass = 
Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK,
+                SamzaConfigFactory.DOLLAR_SIGN));
+          } catch (ClassNotFoundException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+        }
+
+        if (targetClass != null) {
+          if (serializerClass == null) {
+            kryo.register(targetClass);
+          }
+          else {
+            serializer = resolveSerializerInstance(kryo, targetClass, (Class<? 
extends Serializer>) serializerClass);
+            kryo.register(targetClass, serializer);
+          }
+        }
+        else {
+          logger.info("Invalid registration info:{}", info);
+        }
+      }
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Serializer resolveSerializerInstance(Kryo k, Class 
superClass,
+        Class<? extends Serializer> serializerClass) {
+      try {
+        try {
+          return serializerClass.getConstructor(Kryo.class, 
Class.class).newInstance(k, superClass);
+        } catch (Exception ex1) {
+          try {
+            return serializerClass.getConstructor(Kryo.class).newInstance(k);
+          } catch (Exception ex2) {
+            try {
+              return 
serializerClass.getConstructor(Class.class).newInstance(superClass);
+            } catch (Exception ex3) {
+              return serializerClass.newInstance();
+            }
+          }
+        }
+      } catch (Exception ex) {
+        throw new IllegalArgumentException("Unable to create serializer \""
+            + serializerClass.getName()
+            + "\" for class: "
+            + superClass.getName(), ex);
+      }
+    }
+
+    /*
+     * Implement Samza Serde interface
+     */
+    @Override
+    public byte[] toBytes(V obj) {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      Output output = new Output(bos);
+      kryo.writeClassAndObject(output, obj);
+      output.flush();
+      output.close();
+      return bos.toByteArray();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public V fromBytes(byte[] byteArr) {
+      Input input = new Input(byteArr);
+      Object obj = kryo.readClassAndObject(input);
+      input.close();
+      return (V) obj;
+    }
+
+  }
+
+  @Override
+  public Serde<T> getSerde(String name, Config config) {
+    return new 
SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/utils/SerializableSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/utils/SerializableSerializer.java 
b/samoa-samza/src/main/java/org/apache/samoa/utils/SerializableSerializer.java
new file mode 100644
index 0000000..4932ba8
--- /dev/null
+++ 
b/samoa-samza/src/main/java/org/apache/samoa/utils/SerializableSerializer.java
@@ -0,0 +1,70 @@
+package org.apache.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Serialize and deserialize objects with Java serialization
+ * 
+ * @author Anh Thu Vu
+ */
+public class SerializableSerializer extends Serializer<Object> {
+  @Override
+  public void write(Kryo kryo, Output output, Object object) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos = new ObjectOutputStream(bos);
+      oos.writeObject(object);
+      oos.flush();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    byte[] ser = bos.toByteArray();
+    output.writeInt(ser.length);
+    output.writeBytes(ser);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Object read(Kryo kryo, Input input, Class c) {
+    int len = input.readInt();
+    byte[] ser = new byte[len];
+    input.readBytes(ser);
+    ByteArrayInputStream bis = new ByteArrayInputStream(ser);
+    try {
+      ObjectInputStream ois = new ObjectInputStream(bis);
+      return ois.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java 
b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
new file mode 100644
index 0000000..6c369ae
--- /dev/null
+++ b/samoa-samza/src/main/java/org/apache/samoa/utils/SystemsUtils.java
@@ -0,0 +1,386 @@
+package org.apache.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.FileSystems;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities methods for: - Kafka - HDFS - Handling files on local FS
+ * 
+ * @author Anh Thu Vu
+ */
+public class SystemsUtils {
+  private static final Logger logger = 
LoggerFactory.getLogger(SystemsUtils.class);
+
+  public static final String HDFS = "hdfs";
+  public static final String LOCAL_FS = "local";
+
+  private static final String TEMP_FILE = "samoaTemp";
+  private static final String TEMP_FILE_SUFFIX = ".dat";
+
+  /*
+   * Kafka
+   */
+  private static class KafkaUtils {
+    private static ZkClient zkClient;
+
+    static void setZookeeper(String zk) {
+      zkClient = new ZkClient(zk, 30000, 30000, new 
ZKStringSerializerWrapper());
+    }
+
+    /*
+     * Create Kafka topic/stream
+     */
+    static void createKafkaTopic(String name, int partitions, int replicas) {
+      AdminUtils.createTopic(zkClient, name, partitions, replicas, new 
Properties());
+    }
+
+    static class ZKStringSerializerWrapper implements ZkSerializer {
+      @Override
+      public Object deserialize(byte[] byteArray) throws ZkMarshallingError {
+        return ZKStringSerializer.deserialize(byteArray);
+      }
+
+      @Override
+      public byte[] serialize(Object obj) throws ZkMarshallingError {
+        return ZKStringSerializer.serialize(obj);
+      }
+    }
+  }
+
+  /*
+   * HDFS
+   */
+  private static class HDFSUtils {
+    private static String coreConfPath;
+    private static String hdfsConfPath;
+    private static String configHomePath;
+    private static String samoaDir = null;
+
+    static void setHadoopConfigHome(String hadoopConfPath) {
+      logger.info("Hadoop config home:{}", hadoopConfPath);
+      configHomePath = hadoopConfPath;
+      java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml");
+      java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml");
+      coreConfPath = coreSitePath.toAbsolutePath().toString();
+      hdfsConfPath = hdfsSitePath.toAbsolutePath().toString();
+    }
+
+    static String getNameNodeUri() {
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+
+      return config.get("fs.defaultFS");
+    }
+
+    static String getHadoopConfigHome() {
+      return configHomePath;
+    }
+
+    static void setSAMOADir(String dir) {
+      if (dir != null)
+        samoaDir = getNameNodeUri() + dir;
+      else
+        samoaDir = null;
+    }
+
+    static String getDefaultSAMOADir() throws IOException {
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+
+      FileSystem fs = FileSystem.get(config);
+      Path defaultDir = new Path(fs.getHomeDirectory(), ".samoa");
+      return defaultDir.toString();
+    }
+
+    static boolean deleteFileIfExist(String absPath) {
+      Path p = new Path(absPath);
+      return deleteFileIfExist(p);
+    }
+
+    static boolean deleteFileIfExist(Path p) {
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+
+      FileSystem fs;
+      try {
+        fs = FileSystem.get(config);
+        if (fs.exists(p)) {
+          return fs.delete(p, false);
+        }
+        else
+          return true;
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      return false;
+    }
+
+    /*
+     * Write to HDFS
+     */
+    static String writeToHDFS(File file, String dstPath) {
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+      logger.info("Filesystem name:{}", config.get("fs.defaultFS"));
+
+      // Default samoaDir
+      if (samoaDir == null) {
+        try {
+          samoaDir = getDefaultSAMOADir();
+        } catch (IOException e) {
+          e.printStackTrace();
+          return null;
+        }
+      }
+
+      // Setup src and dst paths
+      // java.nio.file.Path tempPath =
+      // FileSystems.getDefault().getPath(samoaDir, dstPath);
+      Path dst = new Path(samoaDir, dstPath);
+      Path src = new Path(file.getAbsolutePath());
+
+      // Delete file if already exists in HDFS
+      if (deleteFileIfExist(dst) == false)
+        return null;
+
+      // Copy to HDFS
+      FileSystem fs;
+      try {
+        fs = FileSystem.get(config);
+        fs.copyFromLocalFile(src, dst);
+      } catch (IOException e) {
+        e.printStackTrace();
+        return null;
+      }
+
+      return dst.toString(); // abs path to file
+    }
+
+    /*
+     * Read from HDFS
+     */
+    static Object deserializeObjectFromFile(String filePath) {
+      logger.info("Deserialize HDFS file:{}", filePath);
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+
+      Path file = new Path(filePath);
+      FSDataInputStream dataInputStream = null;
+      ObjectInputStream ois = null;
+      Object obj = null;
+      FileSystem fs;
+      try {
+        fs = FileSystem.get(config);
+        dataInputStream = fs.open(file);
+        ois = new ObjectInputStream(dataInputStream);
+        obj = ois.readObject();
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      } catch (ClassNotFoundException e) {
+        try {
+          if (dataInputStream != null)
+            dataInputStream.close();
+          if (ois != null)
+            ois.close();
+        } catch (IOException ioException) {
+          // TODO auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+
+      return obj;
+    }
+
+  }
+
+  private static class LocalFileSystemUtils {
+    static boolean serializObjectToFile(Object obj, String fn) {
+      FileOutputStream fos = null;
+      ObjectOutputStream oos = null;
+      try {
+        fos = new FileOutputStream(fn);
+        oos = new ObjectOutputStream(fos);
+        oos.writeObject(obj);
+      } catch (FileNotFoundException e) {
+        e.printStackTrace();
+        return false;
+      } catch (IOException e) {
+        e.printStackTrace();
+        return false;
+      } finally {
+        try {
+          if (fos != null)
+            fos.close();
+          if (oos != null)
+            oos.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+
+      return true;
+    }
+
+    static Object deserializeObjectFromLocalFile(String filename) {
+      logger.info("Deserialize local file:{}", filename);
+      FileInputStream fis = null;
+      ObjectInputStream ois = null;
+      Object obj = null;
+      try {
+        fis = new FileInputStream(filename);
+        ois = new ObjectInputStream(fis);
+        obj = ois.readObject();
+      } catch (IOException e) {
+        // TODO auto-generated catch block
+        e.printStackTrace();
+      } catch (ClassNotFoundException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      } finally {
+        try {
+          if (fis != null)
+            fis.close();
+          if (ois != null)
+            ois.close();
+        } catch (IOException e) {
+          // TODO auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+
+      return obj;
+    }
+  }
+
+  /*
+   * Create streams
+   */
+  public static void createKafkaTopic(String name, int partitions) {
+    createKafkaTopic(name, partitions, 1);
+  }
+
+  public static void createKafkaTopic(String name, int partitions, int 
replicas) {
+    KafkaUtils.createKafkaTopic(name, partitions, replicas);
+  }
+
+  /*
+   * Serialize object
+   */
+  public static boolean serializeObjectToLocalFileSystem(Object object, String 
path) {
+    return LocalFileSystemUtils.serializObjectToFile(object, path);
+  }
+
+  public static String serializeObjectToHDFS(Object object, String path) {
+    File tmpDatFile;
+    try {
+      tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX);
+      if (serializeObjectToLocalFileSystem(object, 
tmpDatFile.getAbsolutePath())) {
+        return HDFSUtils.writeToHDFS(tmpDatFile, path);
+      }
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  /*
+   * Deserialize object
+   */
+  @SuppressWarnings("unchecked")
+  public static Map<String, Object> deserializeMapFromFile(String filesystem, 
String filename) {
+    Map<String, Object> map;
+    if (filesystem.equals(HDFS)) {
+      map = (Map<String, Object>) 
HDFSUtils.deserializeObjectFromFile(filename);
+    }
+    else {
+      map = (Map<String, Object>) 
LocalFileSystemUtils.deserializeObjectFromLocalFile(filename);
+    }
+    return map;
+  }
+
+  public static Object deserializeObjectFromFileAndKey(String filesystem, 
String filename, String key) {
+    Map<String, Object> map = deserializeMapFromFile(filesystem, filename);
+    if (map == null)
+      return null;
+    return map.get(key);
+  }
+
+  /*
+   * Setup
+   */
+  public static void setZookeeper(String zookeeper) {
+    KafkaUtils.setZookeeper(zookeeper);
+  }
+
+  public static void setHadoopConfigHome(String hadoopHome) {
+    HDFSUtils.setHadoopConfigHome(hadoopHome);
+  }
+
+  public static void setSAMOADir(String samoaDir) {
+    HDFSUtils.setSAMOADir(samoaDir);
+  }
+
+  /*
+   * Others
+   */
+  public static String getHDFSNameNodeUri() {
+    return HDFSUtils.getNameNodeUri();
+  }
+
+  public static String getHadoopConfigHome() {
+    return HDFSUtils.getHadoopConfigHome();
+  }
+
+  public static String copyToHDFS(File file, String dstPath) {
+    return HDFSUtils.writeToHDFS(file, dstPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/resources/log4j.xml 
b/samoa-samza/src/main/resources/log4j.xml
index 1704755..5862b88 100644
--- a/samoa-samza/src/main/resources/log4j.xml
+++ b/samoa-samza/src/main/resources/log4j.xml
@@ -51,7 +51,7 @@
     </layout>
   </appender>
 
-  <category name="com.yahoo.labs" additivity="false">
+  <category name="org.apache.samoa" additivity="false">
     <priority value="info" />
     <appender-ref ref="CONSOLE" />
   </category>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-storm/pom.xml b/samoa-storm/pom.xml
index abd351b..a5d1012 100644
--- a/samoa-storm/pom.xml
+++ b/samoa-storm/pom.xml
@@ -31,7 +31,7 @@
 
   <artifactId>samoa-storm</artifactId>
   <parent>
-    <groupId>com.yahoo.labs.samoa</groupId>
+    <groupId>org.apache.samoa</groupId>
     <artifactId>samoa</artifactId>
     <version>0.3.0-SNAPSHOT</version>
   </parent>
@@ -45,12 +45,12 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.yahoo.labs.samoa</groupId>
+      <groupId>org.apache.samoa</groupId>
       <artifactId>samoa-api</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.yahoo.labs.samoa</groupId>
+      <groupId>org.apache.samoa</groupId>
       <artifactId>samoa-test</artifactId>
       <type>test-jar</type>
       <classifier>test-jar-with-dependencies</classifier>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
deleted file mode 100644
index a5e1cdd..0000000
--- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package com.yahoo.labs.samoa;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-import com.yahoo.labs.samoa.topology.impl.StormSamoaUtils;
-import com.yahoo.labs.samoa.topology.impl.StormTopology;
-
-/**
- * The main class to execute a SAMOA task in LOCAL mode in Storm.
- * 
- * @author Arinto Murdopo
- * 
- */
-public class LocalStormDoTask {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(LocalStormDoTask.class);
-
-  /**
-   * The main method.
-   * 
-   * @param args
-   *          the arguments
-   */
-  public static void main(String[] args) {
-
-    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
-
-    int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
-
-    args = tmpArgs.toArray(new String[0]);
-
-    // convert the arguments into Storm topology
-    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
-    String topologyName = stormTopo.getTopologyName();
-
-    Config conf = new Config();
-    // conf.putAll(Utils.readStormConfig());
-    conf.setDebug(false);
-
-    // local mode
-    conf.setMaxTaskParallelism(numWorker);
-
-    backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster();
-    cluster.submitTopology(topologyName, conf, 
stormTopo.getStormBuilder().createTopology());
-
-    backtype.storm.utils.Utils.sleep(600 * 1000);
-
-    cluster.killTopology(topologyName);
-    cluster.shutdown();
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
deleted file mode 100644
index d879171..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Values;
-import com.yahoo.labs.samoa.core.ContentEvent;
-
-/**
- * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class
- * 
- * @author Arinto Murdopo
- * 
- */
-class StormBoltStream extends StormStream {
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = -5712513402991550847L;
-
-  private OutputCollector outputCollector;
-
-  StormBoltStream(String stormComponentId) {
-    super(stormComponentId);
-  }
-
-  @Override
-  public void put(ContentEvent contentEvent) {
-    outputCollector.emit(this.outputStreamId, new Values(contentEvent, 
contentEvent.getKey()));
-  }
-
-  public void setCollector(OutputCollector outputCollector) {
-    this.outputCollector = outputCollector;
-  }
-
-  // @Override
-  // public void setStreamId(String streamId) {
-  // // TODO Auto-generated method stub
-  // //this.outputStreamId = streamId;
-  // }
-
-  @Override
-  public String getStreamId() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
deleted file mode 100644
index 0537c3f..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.ComponentFactory;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-
-/**
- * Component factory implementation for samoa-storm
- */
-public final class StormComponentFactory implements ComponentFactory {
-
-  private final Map<String, Integer> processorList;
-
-  public StormComponentFactory() {
-    processorList = new HashMap<>();
-  }
-
-  @Override
-  public ProcessingItem createPi(Processor processor) {
-    return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), 1);
-  }
-
-  @Override
-  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
-    return new StormEntranceProcessingItem(processor, 
this.getComponentName(processor.getClass()));
-  }
-
-  @Override
-  public Stream createStream(IProcessingItem sourcePi) {
-    StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi;
-    return stormCompatiblePi.createStream();
-  }
-
-  @Override
-  public Topology createTopology(String topoName) {
-    return new StormTopology(topoName);
-  }
-
-  private String getComponentName(Class<? extends Processor> clazz) {
-    StringBuilder componentName = new StringBuilder(clazz.getCanonicalName());
-    String key = componentName.toString();
-    Integer index;
-
-    if (!processorList.containsKey(key)) {
-      index = 1;
-    } else {
-      index = processorList.get(key) + 1;
-    }
-
-    processorList.put(key, index);
-
-    componentName.append('_');
-    componentName.append(index);
-
-    return componentName.toString();
-  }
-
-  @Override
-  public ProcessingItem createPi(Processor processor, int parallelism) {
-    return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), parallelism);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
deleted file mode 100644
index 41ee276..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-/**
- * The main class that used by samoa script to execute SAMOA task.
- * 
- * @author Arinto Murdopo
- * 
- */
-public class StormDoTask {
-  private static final Logger logger = 
LoggerFactory.getLogger(StormDoTask.class);
-  private static String localFlag = "local";
-  private static String clusterFlag = "cluster";
-
-  /**
-   * The main method.
-   * 
-   * @param args
-   *          the arguments
-   */
-  public static void main(String[] args) {
-
-    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
-
-    boolean isLocal = isLocal(tmpArgs);
-    int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
-
-    args = tmpArgs.toArray(new String[0]);
-
-    // convert the arguments into Storm topology
-    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
-    String topologyName = stormTopo.getTopologyName();
-
-    Config conf = new Config();
-    conf.putAll(Utils.readStormConfig());
-    conf.setDebug(false);
-
-    if (isLocal) {
-      // local mode
-      conf.setMaxTaskParallelism(numWorker);
-
-      backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster();
-      cluster.submitTopology(topologyName, conf, 
stormTopo.getStormBuilder().createTopology());
-
-      backtype.storm.utils.Utils.sleep(600 * 1000);
-
-      cluster.killTopology(topologyName);
-      cluster.shutdown();
-
-    } else {
-      // cluster mode
-      conf.setNumWorkers(numWorker);
-      try {
-        backtype.storm.StormSubmitter.submitTopology(topologyName, conf,
-            stormTopo.getStormBuilder().createTopology());
-      } catch (backtype.storm.generated.AlreadyAliveException ale) {
-        ale.printStackTrace();
-      } catch (backtype.storm.generated.InvalidTopologyException ite) {
-        ite.printStackTrace();
-      }
-    }
-  }
-
-  private static boolean isLocal(List<String> tmpArgs) {
-    ExecutionMode executionMode = ExecutionMode.UNDETERMINED;
-
-    int position = tmpArgs.size() - 1;
-    String flag = tmpArgs.get(position);
-    boolean isLocal = true;
-
-    if (flag.equals(clusterFlag)) {
-      executionMode = ExecutionMode.CLUSTER;
-      isLocal = false;
-    } else if (flag.equals(localFlag)) {
-      executionMode = ExecutionMode.LOCAL;
-      isLocal = true;
-    }
-
-    if (executionMode != ExecutionMode.UNDETERMINED) {
-      tmpArgs.remove(position);
-    }
-
-    return isLocal;
-  }
-
-  private enum ExecutionMode {
-    LOCAL, CLUSTER, UNDETERMINED
-  };
-}

Reply via email to