http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
deleted file mode 100644
index 4b9b7ce..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
+++ /dev/null
@@ -1,195 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.JobRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream;
-import com.yahoo.labs.samoa.utils.SamzaConfigFactory;
-import com.yahoo.labs.samoa.utils.SystemsUtils;
-
-/**
- * This class will submit a list of Samza jobs with the Configs generated from 
the input topology
- * 
- * @author Anh Thu Vu
- * 
- */
-public class SamzaEngine {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(SamzaEngine.class);
-
-  /*
-   * Singleton instance
-   */
-  private static SamzaEngine engine = new SamzaEngine();
-
-  private String zookeeper;
-  private String kafka;
-  private int kafkaReplicationFactor;
-  private boolean isLocalMode;
-  private String yarnPackagePath;
-  private String yarnConfHome;
-
-  private String kryoRegisterFile;
-
-  private int amMem;
-  private int containerMem;
-  private int piPerContainerRatio;
-
-  private int checkpointFrequency;
-
-  private void _submitTopology(SamzaTopology topology) {
-
-    // Setup SamzaConfigFactory
-    SamzaConfigFactory configFactory = new SamzaConfigFactory();
-    configFactory.setLocalMode(isLocalMode)
-        .setZookeeper(zookeeper)
-        .setKafka(kafka)
-        .setYarnPackage(yarnPackagePath)
-        .setAMMemory(amMem)
-        .setContainerMemory(containerMem)
-        .setPiPerContainerRatio(piPerContainerRatio)
-        .setKryoRegisterFile(kryoRegisterFile)
-        .setCheckpointFrequency(checkpointFrequency)
-        .setReplicationFactor(kafkaReplicationFactor);
-
-    // Generate the list of Configs
-    List<MapConfig> configs;
-    try {
-      // ConfigFactory generate a list of configs
-      // Serialize a map of PIs and store in a file in the jar at jarFilePath
-      // (in dat/ folder)
-      configs = configFactory.getMapConfigsForTopology(topology);
-    } catch (Exception e) {
-      e.printStackTrace();
-      return;
-    }
-
-    // Create kafka streams
-    Set<Stream> streams = topology.getStreams();
-    for (Stream stream : streams) {
-      SamzaStream samzaStream = (SamzaStream) stream;
-      List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams();
-      for (SamzaSystemStream systemStream : systemStreams) {
-        // all streams should be kafka streams
-        SystemsUtils.createKafkaTopic(systemStream.getStream(), 
systemStream.getParallelism(), kafkaReplicationFactor);
-      }
-    }
-
-    // Submit the jobs with those configs
-    for (MapConfig config : configs) {
-      logger.info("Config:{}", config);
-      JobRunner jobRunner = new JobRunner(config);
-      jobRunner.run();
-    }
-  }
-
-  private void _setupSystemsUtils() {
-    // Setup Utils
-    if (!isLocalMode)
-      SystemsUtils.setHadoopConfigHome(yarnConfHome);
-    SystemsUtils.setZookeeper(zookeeper);
-  }
-
-  /*
-   * Setter methods
-   */
-  public static SamzaEngine getEngine() {
-    return engine;
-  }
-
-  public SamzaEngine setZooKeeper(String zk) {
-    this.zookeeper = zk;
-    return this;
-  }
-
-  public SamzaEngine setKafka(String kafka) {
-    this.kafka = kafka;
-    return this;
-  }
-
-  public SamzaEngine setKafkaReplicationFactor(int replicationFactor) {
-    this.kafkaReplicationFactor = replicationFactor;
-    return this;
-  }
-
-  public SamzaEngine setCheckpointFrequency(int freq) {
-    this.checkpointFrequency = freq;
-    return this;
-  }
-
-  public SamzaEngine setLocalMode(boolean isLocal) {
-    this.isLocalMode = isLocal;
-    return this;
-  }
-
-  public SamzaEngine setYarnPackage(String yarnPackagePath) {
-    this.yarnPackagePath = yarnPackagePath;
-    return this;
-  }
-
-  public SamzaEngine setConfigHome(String configHome) {
-    this.yarnConfHome = configHome;
-    return this;
-  }
-
-  public SamzaEngine setAMMemory(int mem) {
-    this.amMem = mem;
-    return this;
-  }
-
-  public SamzaEngine setContainerMemory(int mem) {
-    this.containerMem = mem;
-    return this;
-  }
-
-  public SamzaEngine setPiPerContainerRatio(int piPerContainer) {
-    this.piPerContainerRatio = piPerContainer;
-    return this;
-  }
-
-  public SamzaEngine setKryoRegisterFile(String registerFile) {
-    this.kryoRegisterFile = registerFile;
-    return this;
-  }
-
-  /**
-   * Submit a list of Samza jobs correspond to the submitted topology
-   * 
-   * @param topo
-   *          the submitted topology
-   */
-  public static void submitTopology(SamzaTopology topo) {
-    // Setup SystemsUtils
-    engine._setupSystemsUtils();
-
-    // Submit topology
-    engine._submitTopology(topo);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
deleted file mode 100644
index bfbd40c..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
+++ /dev/null
@@ -1,229 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.BlockingEnvelopeMap;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.utils.SamzaConfigFactory;
-import com.yahoo.labs.samoa.utils.SystemsUtils;
-
-/**
- * EntranceProcessingItem for Samza which is also a Samza task (StreamTask & 
InitableTask)
- * 
- * @author Anh Thu Vu
- * 
- */
-public class SamzaEntranceProcessingItem extends AbstractEntranceProcessingItem
-    implements SamzaProcessingNode, Serializable, StreamTask, InitableTask {
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = 7157734520046135039L;
-
-  /*
-   * Constructors
-   */
-  public SamzaEntranceProcessingItem(EntranceProcessor processor) {
-    super(processor);
-  }
-
-  // Need this so Samza can initialize a StreamTask
-  public SamzaEntranceProcessingItem() {
-  }
-
-  /*
-   * Simple setters, getters
-   */
-  @Override
-  public int addOutputStream(SamzaStream stream) {
-    this.setOutputStream(stream);
-    return 1; // entrance PI should have only 1 output stream
-  }
-
-  /*
-   * Serialization
-   */
-  private Object writeReplace() {
-    return new SerializationProxy(this);
-  }
-
-  private static class SerializationProxy implements Serializable {
-    /**
-                * 
-                */
-    private static final long serialVersionUID = 313907132721414634L;
-
-    private EntranceProcessor processor;
-    private SamzaStream outputStream;
-    private String name;
-
-    public SerializationProxy(SamzaEntranceProcessingItem epi) {
-      this.processor = epi.getProcessor();
-      this.outputStream = (SamzaStream) epi.getOutputStream();
-      this.name = epi.getName();
-    }
-  }
-
-  /*
-   * Implement Samza Task
-   */
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
-    if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property 
is set, otherwise, assume we are running in local mode and ignore this
-      SystemsUtils.setHadoopConfigHome(yarnConfHome);
-
-    String filename = config.get(SamzaConfigFactory.FILE_KEY);
-    String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
-
-    this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
-    SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
-        filename, this.getName());
-    this.setOutputStream(wrapper.outputStream);
-    SamzaStream output = (SamzaStream) this.getOutputStream();
-    if (output != null) // if output stream exists, set it up
-      output.onCreate();
-  }
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
-      throws Exception {
-    SamzaStream output = (SamzaStream) this.getOutputStream();
-    if (output == null)
-      return; // if there is no output stream, do nothing
-    output.setCollector(collector);
-    ContentEvent event = (ContentEvent) envelope.getMessage();
-    output.put(event);
-  }
-
-  /*
-   * Implementation of Samza's SystemConsumer to get events from source and 
feed
-   * to SAMOA system
-   */
-  /*
-   * Current implementation: buffer the incoming events and send a batch of 
them
-   * when poll() is called by Samza system.
-   * 
-   * Currently: it has a "soft" limit on the size of the buffer: when the 
buffer
-   * size reaches the limit, the reading thread will sleep for 100ms. A hard
-   * limit can be achieved by overriding the method protected
-   * BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() of
-   * BlockingEnvelopeMap But then we have handle the case when the queue is
-   * full.
-   */
-  public static class SamoaSystemConsumer extends BlockingEnvelopeMap {
-
-    private EntranceProcessor entranceProcessor = null;
-    private SystemStreamPartition systemStreamPartition;
-
-    private static final Logger logger = 
LoggerFactory.getLogger(SamoaSystemConsumer.class);
-
-    public SamoaSystemConsumer(String systemName, Config config) {
-      String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
-      if (yarnConfHome != null && yarnConfHome.length() > 0) // if the 
property is set, otherwise, assume we are running in local mode and ignore this
-        SystemsUtils.setHadoopConfigHome(yarnConfHome);
-
-      String filename = config.get(SamzaConfigFactory.FILE_KEY);
-      String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
-      String name = config.get(SamzaConfigFactory.JOB_NAME_KEY);
-      SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
-          filename, name);
-
-      this.entranceProcessor = wrapper.processor;
-      this.entranceProcessor.onCreate(0);
-
-      // Internal stream from SystemConsumer to EntranceTask, so we
-      // need only one partition
-      this.systemStreamPartition = new SystemStreamPartition(systemName, 
wrapper.name, new Partition(0));
-    }
-
-    @Override
-    public void start() {
-      Thread processorPollingThread = new Thread(
-          new Runnable() {
-            @Override
-            public void run() {
-              try {
-                pollingEntranceProcessor();
-                setIsAtHead(systemStreamPartition, true);
-              } catch (InterruptedException e) {
-                e.getStackTrace();
-                stop();
-              }
-            }
-          }
-          );
-
-      processorPollingThread.start();
-    }
-
-    @Override
-    public void stop() {
-
-    }
-
-    private void pollingEntranceProcessor() throws InterruptedException {
-      int messageCnt = 0;
-      while (!this.entranceProcessor.isFinished()) {
-        messageCnt = this.getNumMessagesInQueue(systemStreamPartition);
-        if (this.entranceProcessor.hasNext() && messageCnt < 10000) { // soft
-                                                                      // limit
-                                                                      // on the
-                                                                      // size 
of
-                                                                      // the
-                                                                      // queue
-          this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, null, null,
-              this.entranceProcessor.nextEvent()));
-        } else {
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException e) {
-            break;
-          }
-        }
-      }
-
-      // Send last event
-      this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, null, null,
-          this.entranceProcessor.nextEvent()));
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
deleted file mode 100644
index 6998aa2..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.AbstractProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import com.yahoo.labs.samoa.utils.SamzaConfigFactory;
-import com.yahoo.labs.samoa.utils.SystemsUtils;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-/**
- * ProcessingItem for Samza which is also a Samza task (StreamTask and 
InitableTask)
- * 
- * @author Anh Thu Vu
- */
-public class SamzaProcessingItem extends AbstractProcessingItem
-    implements SamzaProcessingNode, Serializable, StreamTask, InitableTask {
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = 1L;
-
-  private Set<SamzaSystemStream> inputStreams; // input streams: system.stream
-  private List<SamzaStream> outputStreams;
-
-  /*
-   * Constructors
-   */
-  // Need this so Samza can initialize a StreamTask
-  public SamzaProcessingItem() {
-  }
-
-  /*
-   * Implement com.yahoo.labs.samoa.topology.ProcessingItem
-   */
-  public SamzaProcessingItem(Processor processor, int parallelismHint) {
-    super(processor, parallelismHint);
-    this.inputStreams = new HashSet<SamzaSystemStream>();
-    this.outputStreams = new LinkedList<SamzaStream>();
-  }
-
-  /*
-   * Simple setters, getters
-   */
-  public Set<SamzaSystemStream> getInputStreams() {
-    return this.inputStreams;
-  }
-
-  /*
-   * Extends AbstractProcessingItem
-   */
-  @Override
-  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
-    SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new 
StreamDestination(this, this
-        .getParallelism(), scheme));
-    this.inputStreams.add(stream);
-    return this;
-  }
-
-  /*
-   * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode
-   */
-  @Override
-  public int addOutputStream(SamzaStream stream) {
-    this.outputStreams.add(stream);
-    return this.outputStreams.size();
-  }
-
-  public List<SamzaStream> getOutputStreams() {
-    return this.outputStreams;
-  }
-
-  /*
-   * Implement Samza task
-   */
-  @Override
-  public void init(Config config, TaskContext context) throws Exception {
-    String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
-    if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property 
is set, otherwise, assume we are running in local mode and ignore this          
                                                  // set , otherwise,
-      SystemsUtils.setHadoopConfigHome(yarnConfHome);
-
-    String filename = config.get(SamzaConfigFactory.FILE_KEY);
-    String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
-    this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
-    SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
-        filename, this.getName());
-    this.setProcessor(wrapper.processor);
-    this.outputStreams = wrapper.outputStreams;
-
-    // Init Processor and Streams
-    this.getProcessor().onCreate(0);
-    for (SamzaStream stream : this.outputStreams) {
-      stream.onCreate();
-    }
-
-  }
-
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
-      throws Exception {
-    for (SamzaStream stream : this.outputStreams) {
-      stream.setCollector(collector);
-    }
-    this.getProcessor().process((ContentEvent) envelope.getMessage());
-  }
-
-  /*
-   * SerializationProxy
-   */
-  private Object writeReplace() {
-    return new SerializationProxy(this);
-  }
-
-  private static class SerializationProxy implements Serializable {
-    /**
-                * 
-                */
-    private static final long serialVersionUID = 1534643987559070336L;
-
-    private Processor processor;
-    private List<SamzaStream> outputStreams;
-
-    public SerializationProxy(SamzaProcessingItem pi) {
-      this.processor = pi.getProcessor();
-      this.outputStreams = pi.getOutputStreams();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
deleted file mode 100644
index 8a0e01e..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-
-/**
- * Common interface of SamzaEntranceProcessingItem and SamzaProcessingItem
- * 
- * @author Anh Thu Vu
- */
-public interface SamzaProcessingNode extends IProcessingItem {
-  /**
-   * Registers an output stream with this processing item
-   * 
-   * @param stream
-   *          the output stream
-   * @return the number of output streams of this processing item
-   */
-  public int addOutputStream(SamzaStream stream);
-
-  /**
-   * Gets the name/id of this processing item
-   * 
-   * @return the name/id of this processing item
-   */
-  // TODO: include getName() and setName() in IProcessingItem and/or
-  // AbstractEPI/PI
-  public String getName();
-
-  /**
-   * Sets the name/id for this processing item
-   * 
-   * @param name
-   *          the name/id of this processing item
-   */
-  // TODO: include getName() and setName() in IProcessingItem and/or
-  // AbstractEPI/PI
-  public void setName(String name);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
deleted file mode 100644
index 98165d4..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
+++ /dev/null
@@ -1,246 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.AbstractStream;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * Stream for SAMOA on Samza
- * 
- * @author Anh Thu Vu
- */
-public class SamzaStream extends AbstractStream implements Serializable {
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = 1L;
-
-  private static final String DEFAULT_SYSTEM_NAME = "kafka";
-
-  private List<SamzaSystemStream> systemStreams;
-  private transient MessageCollector collector;
-  private String systemName;
-
-  /*
-   * Constructor
-   */
-  public SamzaStream(IProcessingItem sourcePi) {
-    super(sourcePi);
-    this.systemName = DEFAULT_SYSTEM_NAME;
-    // Get name/id for this stream
-    SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi;
-    int index = samzaPi.addOutputStream(this);
-    this.setStreamId(samzaPi.getName() + "-" + Integer.toString(index));
-    // init list of SamzaSystemStream
-    systemStreams = new ArrayList<SamzaSystemStream>();
-  }
-
-  /*
-   * System name (Kafka)
-   */
-  public void setSystemName(String systemName) {
-    this.systemName = systemName;
-    for (SamzaSystemStream systemStream : systemStreams) {
-      systemStream.setSystem(systemName);
-    }
-  }
-
-  public String getSystemName() {
-    return this.systemName;
-  }
-
-  /*
-   * Add the PI to the list of destinations. Return the name of the
-   * corresponding SystemStream.
-   */
-  public SamzaSystemStream addDestination(StreamDestination destination) {
-    PartitioningScheme scheme = destination.getPartitioningScheme();
-    int parallelism = destination.getParallelism();
-
-    SamzaSystemStream resultStream = null;
-    for (int i = 0; i < systemStreams.size(); i++) {
-      // There is an existing SystemStream that matches the settings.
-      // Do not create a new one
-      if (systemStreams.get(i).isSame(scheme, parallelism)) {
-        resultStream = systemStreams.get(i);
-      }
-    }
-
-    // No existing SystemStream match the requirement
-    // Create a new one
-    if (resultStream == null) {
-      String topicName = this.getStreamId() + "-" + 
Integer.toString(systemStreams.size());
-      resultStream = new SamzaSystemStream(this.systemName, topicName, scheme, 
parallelism);
-      systemStreams.add(resultStream);
-    }
-
-    return resultStream;
-  }
-
-  public void setCollector(MessageCollector collector) {
-    this.collector = collector;
-  }
-
-  public MessageCollector getCollector() {
-    return this.collector;
-  }
-
-  public void onCreate() {
-    for (SamzaSystemStream stream : systemStreams) {
-      stream.initSystemStream();
-    }
-  }
-
-  /*
-   * Implement Stream interface
-   */
-  @Override
-  public void put(ContentEvent event) {
-    for (SamzaSystemStream stream : systemStreams) {
-      stream.send(collector, event);
-    }
-  }
-
-  public List<SamzaSystemStream> getSystemStreams() {
-    return this.systemStreams;
-  }
-
-  /**
-   * SamzaSystemStream wrap around a Samza's SystemStream It contains the info 
to create a Samza stream during the
-   * constructing process of the topology and will create the actual Samza 
stream when the topology is submitted
-   * (invoking initSystemStream())
-   * 
-   * @author Anh Thu Vu
-   */
-  public static class SamzaSystemStream implements Serializable {
-    /**
-                * 
-                */
-    private static final long serialVersionUID = 1L;
-    private String system;
-    private String stream;
-    private PartitioningScheme scheme;
-    private int parallelism;
-
-    private transient SystemStream actualSystemStream = null;
-
-    /*
-     * Constructors
-     */
-    public SamzaSystemStream(String system, String stream, PartitioningScheme 
scheme, int parallelism) {
-      this.system = system;
-      this.stream = stream;
-      this.scheme = scheme;
-      this.parallelism = parallelism;
-    }
-
-    public SamzaSystemStream(String system, String stream, PartitioningScheme 
scheme) {
-      this(system, stream, scheme, 1);
-    }
-
-    /*
-     * Setters
-     */
-    public void setSystem(String system) {
-      this.system = system;
-    }
-
-    /*
-     * Getters
-     */
-    public String getSystem() {
-      return this.system;
-    }
-
-    public String getStream() {
-      return this.stream;
-    }
-
-    public PartitioningScheme getPartitioningScheme() {
-      return this.scheme;
-    }
-
-    public int getParallelism() {
-      return this.parallelism;
-    }
-
-    public boolean isSame(PartitioningScheme scheme, int parallelismHint) {
-      return (this.scheme == scheme && this.parallelism == parallelismHint);
-    }
-
-    /*
-     * Init the actual Samza stream
-     */
-    public void initSystemStream() {
-      actualSystemStream = new SystemStream(this.system, this.stream);
-    }
-
-    /*
-     * Send a ContentEvent
-     */
-    public void send(MessageCollector collector, ContentEvent contentEvent) {
-      if (actualSystemStream == null)
-        this.initSystemStream();
-
-      switch (this.scheme) {
-      case SHUFFLE:
-        this.sendShuffle(collector, contentEvent);
-        break;
-      case GROUP_BY_KEY:
-        this.sendGroupByKey(collector, contentEvent);
-        break;
-      case BROADCAST:
-        this.sendBroadcast(collector, contentEvent);
-        break;
-      }
-    }
-
-    /*
-     * Helpers
-     */
-    private synchronized void sendShuffle(MessageCollector collector, 
ContentEvent event) {
-      collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, 
event));
-    }
-
-    private void sendGroupByKey(MessageCollector collector, ContentEvent 
event) {
-      collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, 
event.getKey(), null, event));
-    }
-
-    private synchronized void sendBroadcast(MessageCollector collector, 
ContentEvent event) {
-      for (int i = 0; i < parallelism; i++) {
-        collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, 
null, event));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
deleted file mode 100644
index a30a3a3..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.HashSet;
-import java.util.Set;
-
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.AbstractTopology;
-
-/**
- * Topology for Samza
- * 
- * @author Anh Thu Vu
- */
-public class SamzaTopology extends AbstractTopology {
-  private int procItemCounter;
-
-  public SamzaTopology(String topoName) {
-    super(topoName);
-    procItemCounter = 0;
-  }
-
-  @Override
-  public void addProcessingItem(IProcessingItem procItem, int parallelism) {
-    super.addProcessingItem(procItem, parallelism);
-    SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem;
-    samzaPi.setName(this.getTopologyName() + "-" + 
Integer.toString(procItemCounter));
-    procItemCounter++;
-  }
-
-  /*
-   * Gets the set of ProcessingItems, excluding EntrancePIs Used by
-   * SamzaConfigFactory as the config for EntrancePIs and normal PIs are
-   * different
-   */
-  public Set<IProcessingItem> getNonEntranceProcessingItems() throws Exception 
{
-    Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>();
-    copiedSet.addAll(this.getProcessingItems());
-    boolean result = copiedSet.removeAll(this.getEntranceProcessingItems());
-    if (!result) {
-      throw new Exception("Failed extracting the set of non-entrance 
processing items");
-    }
-    return copiedSet;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
deleted file mode 100644
index 972daa2..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
+++ /dev/null
@@ -1,539 +0,0 @@
-package com.yahoo.labs.samoa.utils;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.local.LocalJobFactory;
-import org.apache.samza.job.yarn.YarnJobFactory;
-import org.apache.samza.system.kafka.KafkaSystemFactory;
-
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.impl.SamoaSystemFactory;
-import com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.impl.SamzaProcessingItem;
-import com.yahoo.labs.samoa.topology.impl.SamzaStream;
-import com.yahoo.labs.samoa.topology.impl.SamzaTopology;
-import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream;
-
-/**
- * Generate Configs that will be used to submit Samza jobs from the input 
topology (one config per PI/EntrancePI in the
- * topology)
- * 
- * @author Anh Thu Vu
- * 
- */
-public class SamzaConfigFactory {
-  public static final String SYSTEM_NAME = "samoa";
-
-  // DEFAULT VALUES
-  private static final String DEFAULT_ZOOKEEPER = "localhost:2181";
-  private static final String DEFAULT_BROKER_LIST = "localhost:9092";
-
-  // DELIMINATORS
-  public static final String COMMA = ",";
-  public static final String COLON = ":";
-  public static final String DOT = ".";
-  public static final char DOLLAR_SIGN = '$';
-  public static final char QUESTION_MARK = '?';
-
-  // PARTITIONING SCHEMES
-  public static final String SHUFFLE = "shuffle";
-  public static final String KEY = "key";
-  public static final String BROADCAST = "broadcast";
-
-  // PROPERTY KEYS
-  // JOB
-  public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class";
-  public static final String JOB_NAME_KEY = "job.name";
-  // YARN
-  public static final String YARN_PACKAGE_KEY = "yarn.package.path";
-  public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb";
-  public static final String AM_MEMORY_KEY = "yarn.am.container.memory.mb";
-  public static final String CONTAINER_COUNT_KEY = "yarn.container.count";
-  // TASK (SAMZA original)
-  public static final String TASK_CLASS_KEY = "task.class";
-  public static final String TASK_INPUTS_KEY = "task.inputs";
-  // TASK (extra)
-  public static final String FILE_KEY = "task.processor.file";
-  public static final String FILESYSTEM_KEY = "task.processor.filesystem";
-  public static final String ENTRANCE_INPUT_KEY = "task.entrance.input";
-  public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs";
-  public static final String YARN_CONF_HOME_KEY = "yarn.config.home";
-  // KAFKA
-  public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect";
-  public static final String BROKER_URI_KEY = "producer.metadata.broker.list";
-  public static final String KAFKA_BATCHSIZE_KEY = 
"producer.batch.num.messages";
-  public static final String KAFKA_PRODUCER_TYPE_KEY = 
"producer.producer.type";
-  // SERDE
-  public static final String SERDE_REGISTRATION_KEY = "kryo.register";
-
-  // Instance variables
-  private boolean isLocalMode;
-  private String zookeeper;
-  private String kafkaBrokerList;
-  private int replicationFactor;
-  private int amMemory;
-  private int containerMemory;
-  private int piPerContainerRatio;
-  private int checkpointFrequency; // in ms
-
-  private String jarPath;
-  private String kryoRegisterFile = null;
-
-  public SamzaConfigFactory() {
-    this.isLocalMode = false;
-    this.zookeeper = DEFAULT_ZOOKEEPER;
-    this.kafkaBrokerList = DEFAULT_BROKER_LIST;
-    this.checkpointFrequency = 60000; // default: 1 minute
-    this.replicationFactor = 1;
-  }
-
-  /*
-   * Setter methods
-   */
-  public SamzaConfigFactory setYarnPackage(String packagePath) {
-    this.jarPath = packagePath;
-    return this;
-  }
-
-  public SamzaConfigFactory setLocalMode(boolean isLocal) {
-    this.isLocalMode = isLocal;
-    return this;
-  }
-
-  public SamzaConfigFactory setZookeeper(String zk) {
-    this.zookeeper = zk;
-    return this;
-  }
-
-  public SamzaConfigFactory setKafka(String brokerList) {
-    this.kafkaBrokerList = brokerList;
-    return this;
-  }
-
-  public SamzaConfigFactory setCheckpointFrequency(int freq) {
-    this.checkpointFrequency = freq;
-    return this;
-  }
-
-  public SamzaConfigFactory setReplicationFactor(int replicationFactor) {
-    this.replicationFactor = replicationFactor;
-    return this;
-  }
-
-  public SamzaConfigFactory setAMMemory(int mem) {
-    this.amMemory = mem;
-    return this;
-  }
-
-  public SamzaConfigFactory setContainerMemory(int mem) {
-    this.containerMemory = mem;
-    return this;
-  }
-
-  public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) {
-    this.piPerContainerRatio = piPerContainer;
-    return this;
-  }
-
-  public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) {
-    this.kryoRegisterFile = kryoRegister;
-    return this;
-  }
-
-  /*
-   * Generate a map of all config properties for the input SamzaProcessingItem
-   */
-  private Map<String, String> getMapForPI(SamzaProcessingItem pi, String 
filename, String filesystem) throws Exception {
-    Map<String, String> map = getBasicSystemConfig();
-
-    // Set job name, task class, task inputs (from SamzaProcessingItem)
-    setJobName(map, pi.getName());
-    setTaskClass(map, SamzaProcessingItem.class.getName());
-
-    StringBuilder streamNames = new StringBuilder();
-    boolean first = true;
-    for (SamzaSystemStream stream : pi.getInputStreams()) {
-      if (!first)
-        streamNames.append(COMMA);
-      streamNames.append(stream.getSystem() + DOT + stream.getStream());
-      if (first)
-        first = false;
-    }
-    setTaskInputs(map, streamNames.toString());
-
-    // Processor file
-    setFileName(map, filename);
-    setFileSystem(map, filesystem);
-
-    List<String> nameList = new ArrayList<String>();
-    // Default kafka system: kafka0: sync producer
-    // This system is always required: it is used for checkpointing
-    nameList.add("kafka0");
-    setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1);
-    // Output streams: set kafka systems
-    for (SamzaStream stream : pi.getOutputStreams()) {
-      boolean found = false;
-      for (String name : nameList) {
-        if (stream.getSystemName().equals(name)) {
-          found = true;
-          break;
-        }
-      }
-      if (!found) {
-        nameList.add(stream.getSystemName());
-        setKafkaSystem(map, stream.getSystemName(), this.zookeeper, 
this.kafkaBrokerList, stream.getBatchSize());
-      }
-    }
-    // Input streams: set kafka systems
-    for (SamzaSystemStream stream : pi.getInputStreams()) {
-      boolean found = false;
-      for (String name : nameList) {
-        if (stream.getSystem().equals(name)) {
-          found = true;
-          break;
-        }
-      }
-      if (!found) {
-        nameList.add(stream.getSystem());
-        setKafkaSystem(map, stream.getSystem(), this.zookeeper, 
this.kafkaBrokerList, 1);
-      }
-    }
-
-    // Checkpointing
-    setValue(map, "task.checkpoint.factory", 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
-    setValue(map, "task.checkpoint.system", "kafka0");
-    setValue(map, "task.commit.ms", "1000");
-    setValue(map, "task.checkpoint.replication.factor", 
Integer.toString(this.replicationFactor));
-
-    // Number of containers
-    setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio);
-
-    return map;
-  }
-
-  /*
-   * Generate a map of all config properties for the input SamzaProcessingItem
-   */
-  public Map<String, String> getMapForEntrancePI(SamzaEntranceProcessingItem 
epi, String filename, String filesystem) {
-    Map<String, String> map = getBasicSystemConfig();
-
-    // Set job name, task class (from SamzaEntranceProcessingItem)
-    setJobName(map, epi.getName());
-    setTaskClass(map, SamzaEntranceProcessingItem.class.getName());
-
-    // Input for the entrance task (from our custom consumer)
-    setTaskInputs(map, SYSTEM_NAME + "." + epi.getName());
-
-    // Output from entrance task
-    // Since entrancePI should have only 1 output stream
-    // there is no need for checking the batch size, setting different system
-    // names
-    // The custom consumer (samoa system) does not suuport reading from a
-    // specific index
-    // => no need for checkpointing
-    SamzaStream outputStream = (SamzaStream) epi.getOutputStream();
-    // Set samoa system factory
-    setValue(map, "systems." + SYSTEM_NAME + ".samza.factory", 
SamoaSystemFactory.class.getName());
-    // Set Kafka system (only if there is an output stream)
-    if (outputStream != null)
-      setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, 
this.kafkaBrokerList,
-          outputStream.getBatchSize());
-
-    // Processor file
-    setFileName(map, filename);
-    setFileSystem(map, filesystem);
-
-    // Number of containers
-    setNumberOfContainers(map, 1, this.piPerContainerRatio);
-
-    return map;
-  }
-
-  /*
-   * Generate a list of map (of config properties) for all PIs and EPI in the
-   * input topology
-   */
-  public List<Map<String, String>> getMapsForTopology(SamzaTopology topology) 
throws Exception {
-
-    List<Map<String, String>> maps = new ArrayList<Map<String, String>>();
-
-    // File to write serialized objects
-    String filename = topology.getTopologyName() + ".dat";
-    Path dirPath = FileSystems.getDefault().getPath("dat");
-    Path filePath = FileSystems.getDefault().getPath(dirPath.toString(), 
filename);
-    String dstPath = filePath.toString();
-    String resPath;
-    String filesystem;
-    if (this.isLocalMode) {
-      filesystem = SystemsUtils.LOCAL_FS;
-      File dir = dirPath.toFile();
-      if (!dir.exists())
-        FileUtils.forceMkdir(dir);
-    }
-    else {
-      filesystem = SystemsUtils.HDFS;
-    }
-
-    // Correct system name for streams
-    this.setSystemNameForStreams(topology.getStreams());
-
-    // Add all PIs to a collection (map)
-    Map<String, Object> piMap = new HashMap<String, Object>();
-    Set<EntranceProcessingItem> entranceProcessingItems = 
topology.getEntranceProcessingItems();
-    Set<IProcessingItem> processingItems = 
topology.getNonEntranceProcessingItems();
-    for (EntranceProcessingItem epi : entranceProcessingItems) {
-      SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi;
-      piMap.put(sepi.getName(), sepi);
-    }
-    for (IProcessingItem pi : processingItems) {
-      SamzaProcessingItem spi = (SamzaProcessingItem) pi;
-      piMap.put(spi.getName(), spi);
-    }
-
-    // Serialize all PIs
-    boolean serialized = false;
-    if (this.isLocalMode) {
-      serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, 
dstPath);
-      resPath = dstPath;
-    }
-    else {
-      resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath);
-      serialized = resPath != null;
-    }
-
-    if (!serialized) {
-      throw new Exception("Fail serialize map of PIs to file");
-    }
-
-    // MapConfig for all PIs
-    for (EntranceProcessingItem epi : entranceProcessingItems) {
-      SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi;
-      maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem));
-    }
-    for (IProcessingItem pi : processingItems) {
-      SamzaProcessingItem spi = (SamzaProcessingItem) pi;
-      maps.add(this.getMapForPI(spi, resPath, filesystem));
-    }
-
-    return maps;
-  }
-
-  /**
-   * Construct a list of MapConfigs for a Topology
-   * 
-   * @return the list of MapConfigs
-   * @throws Exception
-   */
-  public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) 
throws Exception {
-    List<MapConfig> configs = new ArrayList<MapConfig>();
-    List<Map<String, String>> maps = this.getMapsForTopology(topology);
-    for (Map<String, String> map : maps) {
-      configs.add(new MapConfig(map));
-    }
-    return configs;
-  }
-
-  /*
-   *
-   */
-  public void setSystemNameForStreams(Set<Stream> streams) {
-    Map<Integer, String> batchSizeMap = new HashMap<Integer, String>();
-    batchSizeMap.put(1, "kafka0"); // default system with sync producer
-    int counter = 0;
-    for (Stream stream : streams) {
-      SamzaStream samzaStream = (SamzaStream) stream;
-      String systemName = batchSizeMap.get(samzaStream.getBatchSize());
-      if (systemName == null) {
-        counter++;
-        // Add new system
-        systemName = "kafka" + Integer.toString(counter);
-        batchSizeMap.put(samzaStream.getBatchSize(), systemName);
-      }
-      samzaStream.setSystemName(systemName);
-    }
-
-  }
-
-  /*
-   * Generate a map with common properties for PIs and EPI
-   */
-  private Map<String, String> getBasicSystemConfig() {
-    Map<String, String> map = new HashMap<String, String>();
-    // Job & Task
-    if (this.isLocalMode)
-      map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName());
-    else {
-      map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName());
-
-      // yarn
-      map.put(YARN_PACKAGE_KEY, jarPath);
-      map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory));
-      map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory));
-      map.put(CONTAINER_COUNT_KEY, "1");
-      map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome());
-
-      // Task opts (Heap size = 0.75 container memory)
-      int heapSize = (int) (0.75 * this.containerMemory);
-      map.put("task.opts", "-Xmx" + Integer.toString(heapSize) + "M 
-XX:+PrintGCDateStamps");
-    }
-
-    map.put(JOB_NAME_KEY, "");
-    map.put(TASK_CLASS_KEY, "");
-    map.put(TASK_INPUTS_KEY, "");
-
-    // register serializer
-    map.put("serializers.registry.kryo.class", 
SamzaKryoSerdeFactory.class.getName());
-
-    // Serde registration
-    setKryoRegistration(map, this.kryoRegisterFile);
-
-    return map;
-  }
-
-  /*
-   * Helper methods to set different properties in the input map
-   */
-  private static void setJobName(Map<String, String> map, String jobName) {
-    map.put(JOB_NAME_KEY, jobName);
-  }
-
-  private static void setFileName(Map<String, String> map, String filename) {
-    map.put(FILE_KEY, filename);
-  }
-
-  private static void setFileSystem(Map<String, String> map, String 
filesystem) {
-    map.put(FILESYSTEM_KEY, filesystem);
-  }
-
-  private static void setTaskClass(Map<String, String> map, String taskClass) {
-    map.put(TASK_CLASS_KEY, taskClass);
-  }
-
-  private static void setTaskInputs(Map<String, String> map, String inputs) {
-    map.put(TASK_INPUTS_KEY, inputs);
-  }
-
-  private static void setKryoRegistration(Map<String, String> map, String 
kryoRegisterFile) {
-    if (kryoRegisterFile != null) {
-      String value = readKryoRegistration(kryoRegisterFile);
-      map.put(SERDE_REGISTRATION_KEY, value);
-    }
-  }
-
-  private static void setNumberOfContainers(Map<String, String> map, int 
parallelism, int piPerContainer) {
-    int res = parallelism / piPerContainer;
-    if (parallelism % piPerContainer != 0)
-      res++;
-    map.put(CONTAINER_COUNT_KEY, Integer.toString(res));
-  }
-
-  private static void setKafkaSystem(Map<String, String> map, String 
systemName, String zk, String brokers,
-      int batchSize) {
-    map.put("systems." + systemName + ".samza.factory", 
KafkaSystemFactory.class.getName());
-    map.put("systems." + systemName + ".samza.msg.serde", "kryo");
-
-    map.put("systems." + systemName + "." + ZOOKEEPER_URI_KEY, zk);
-    map.put("systems." + systemName + "." + BROKER_URI_KEY, brokers);
-    map.put("systems." + systemName + "." + KAFKA_BATCHSIZE_KEY, 
Integer.toString(batchSize));
-
-    map.put("systems." + systemName + ".samza.offset.default", "oldest");
-
-    if (batchSize > 1) {
-      map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, 
"async");
-    }
-    else {
-      map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "sync");
-    }
-  }
-
-  // Set custom properties
-  private static void setValue(Map<String, String> map, String key, String 
value) {
-    map.put(key, value);
-  }
-
-  /*
-   * Helper method to parse Kryo registration file
-   */
-  private static String readKryoRegistration(String filePath) {
-    FileInputStream fis = null;
-    Properties props = new Properties();
-    StringBuilder result = new StringBuilder();
-    try {
-      fis = new FileInputStream(filePath);
-      props.load(fis);
-
-      boolean first = true;
-      String value = null;
-      for (String k : props.stringPropertyNames()) {
-        if (!first)
-          result.append(COMMA);
-        else
-          first = false;
-
-        // Need to avoid the dollar sign as samza pass all the properties in
-        // the config to containers via commandline parameters/enviroment
-        // variables
-        // We might escape the dollar sign, but it's more complicated than
-        // replacing it with something else
-        result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
-        value = props.getProperty(k);
-        if (value != null && value.trim().length() > 0) {
-          result.append(COLON);
-          result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
-        }
-      }
-    } catch (FileNotFoundException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } finally {
-      if (fis != null)
-        try {
-          fis.close();
-        } catch (IOException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
-    }
-
-    return result.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
deleted file mode 100644
index 1874fa3..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
+++ /dev/null
@@ -1,158 +0,0 @@
-package com.yahoo.labs.samoa.utils;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.ByteArrayOutputStream;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.serializers.SerdeFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-/**
- * Implementation of Samza's SerdeFactory that uses Kryo to 
serialize/deserialize objects
- * 
- * @author Anh Thu Vu
- * @param <T>
- * 
- */
-public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(SamzaKryoSerdeFactory.class);
-
-  public static class SamzaKryoSerde<V> implements Serde<V> {
-    private Kryo kryo;
-
-    public SamzaKryoSerde(String registrationInfo) {
-      this.kryo = new Kryo();
-      this.register(registrationInfo);
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private void register(String registrationInfo) {
-      if (registrationInfo == null)
-        return;
-
-      String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA);
-
-      Class targetClass = null;
-      Class serializerClass = null;
-      Serializer serializer = null;
-
-      for (String info : infoList) {
-        String[] fields = info.split(SamzaConfigFactory.COLON);
-
-        targetClass = null;
-        serializerClass = null;
-        if (fields.length > 0) {
-          try {
-            targetClass = 
Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK,
-                SamzaConfigFactory.DOLLAR_SIGN));
-          } catch (ClassNotFoundException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-          }
-        }
-        if (fields.length > 1) {
-          try {
-            serializerClass = 
Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK,
-                SamzaConfigFactory.DOLLAR_SIGN));
-          } catch (ClassNotFoundException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-          }
-        }
-
-        if (targetClass != null) {
-          if (serializerClass == null) {
-            kryo.register(targetClass);
-          }
-          else {
-            serializer = resolveSerializerInstance(kryo, targetClass, (Class<? 
extends Serializer>) serializerClass);
-            kryo.register(targetClass, serializer);
-          }
-        }
-        else {
-          logger.info("Invalid registration info:{}", info);
-        }
-      }
-    }
-
-    @SuppressWarnings("rawtypes")
-    private static Serializer resolveSerializerInstance(Kryo k, Class 
superClass,
-        Class<? extends Serializer> serializerClass) {
-      try {
-        try {
-          return serializerClass.getConstructor(Kryo.class, 
Class.class).newInstance(k, superClass);
-        } catch (Exception ex1) {
-          try {
-            return serializerClass.getConstructor(Kryo.class).newInstance(k);
-          } catch (Exception ex2) {
-            try {
-              return 
serializerClass.getConstructor(Class.class).newInstance(superClass);
-            } catch (Exception ex3) {
-              return serializerClass.newInstance();
-            }
-          }
-        }
-      } catch (Exception ex) {
-        throw new IllegalArgumentException("Unable to create serializer \""
-            + serializerClass.getName()
-            + "\" for class: "
-            + superClass.getName(), ex);
-      }
-    }
-
-    /*
-     * Implement Samza Serde interface
-     */
-    @Override
-    public byte[] toBytes(V obj) {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      Output output = new Output(bos);
-      kryo.writeClassAndObject(output, obj);
-      output.flush();
-      output.close();
-      return bos.toByteArray();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public V fromBytes(byte[] byteArr) {
-      Input input = new Input(byteArr);
-      Object obj = kryo.readClassAndObject(input);
-      input.close();
-      return (V) obj;
-    }
-
-  }
-
-  @Override
-  public Serde<T> getSerde(String name, Config config) {
-    return new 
SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
deleted file mode 100644
index 4b7850d..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.yahoo.labs.samoa.utils;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-/**
- * Serialize and deserialize objects with Java serialization
- * 
- * @author Anh Thu Vu
- */
-public class SerializableSerializer extends Serializer<Object> {
-  @Override
-  public void write(Kryo kryo, Output output, Object object) {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    try {
-      ObjectOutputStream oos = new ObjectOutputStream(bos);
-      oos.writeObject(object);
-      oos.flush();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    byte[] ser = bos.toByteArray();
-    output.writeInt(ser.length);
-    output.writeBytes(ser);
-  }
-
-  @SuppressWarnings("rawtypes")
-  @Override
-  public Object read(Kryo kryo, Input input, Class c) {
-    int len = input.readInt();
-    byte[] ser = new byte[len];
-    input.readBytes(ser);
-    ByteArrayInputStream bis = new ByteArrayInputStream(ser);
-    try {
-      ObjectInputStream ois = new ObjectInputStream(bis);
-      return ois.readObject();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
deleted file mode 100644
index 0b12971..0000000
--- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
+++ /dev/null
@@ -1,386 +0,0 @@
-package com.yahoo.labs.samoa.utils;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.file.FileSystems;
-import java.util.Map;
-import java.util.Properties;
-
-import kafka.admin.AdminUtils;
-import kafka.utils.ZKStringSerializer;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkMarshallingError;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities methods for: - Kafka - HDFS - Handling files on local FS
- * 
- * @author Anh Thu Vu
- */
-public class SystemsUtils {
-  private static final Logger logger = 
LoggerFactory.getLogger(SystemsUtils.class);
-
-  public static final String HDFS = "hdfs";
-  public static final String LOCAL_FS = "local";
-
-  private static final String TEMP_FILE = "samoaTemp";
-  private static final String TEMP_FILE_SUFFIX = ".dat";
-
-  /*
-   * Kafka
-   */
-  private static class KafkaUtils {
-    private static ZkClient zkClient;
-
-    static void setZookeeper(String zk) {
-      zkClient = new ZkClient(zk, 30000, 30000, new 
ZKStringSerializerWrapper());
-    }
-
-    /*
-     * Create Kafka topic/stream
-     */
-    static void createKafkaTopic(String name, int partitions, int replicas) {
-      AdminUtils.createTopic(zkClient, name, partitions, replicas, new 
Properties());
-    }
-
-    static class ZKStringSerializerWrapper implements ZkSerializer {
-      @Override
-      public Object deserialize(byte[] byteArray) throws ZkMarshallingError {
-        return ZKStringSerializer.deserialize(byteArray);
-      }
-
-      @Override
-      public byte[] serialize(Object obj) throws ZkMarshallingError {
-        return ZKStringSerializer.serialize(obj);
-      }
-    }
-  }
-
-  /*
-   * HDFS
-   */
-  private static class HDFSUtils {
-    private static String coreConfPath;
-    private static String hdfsConfPath;
-    private static String configHomePath;
-    private static String samoaDir = null;
-
-    static void setHadoopConfigHome(String hadoopConfPath) {
-      logger.info("Hadoop config home:{}", hadoopConfPath);
-      configHomePath = hadoopConfPath;
-      java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml");
-      java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml");
-      coreConfPath = coreSitePath.toAbsolutePath().toString();
-      hdfsConfPath = hdfsSitePath.toAbsolutePath().toString();
-    }
-
-    static String getNameNodeUri() {
-      Configuration config = new Configuration();
-      config.addResource(new Path(coreConfPath));
-      config.addResource(new Path(hdfsConfPath));
-
-      return config.get("fs.defaultFS");
-    }
-
-    static String getHadoopConfigHome() {
-      return configHomePath;
-    }
-
-    static void setSAMOADir(String dir) {
-      if (dir != null)
-        samoaDir = getNameNodeUri() + dir;
-      else
-        samoaDir = null;
-    }
-
-    static String getDefaultSAMOADir() throws IOException {
-      Configuration config = new Configuration();
-      config.addResource(new Path(coreConfPath));
-      config.addResource(new Path(hdfsConfPath));
-
-      FileSystem fs = FileSystem.get(config);
-      Path defaultDir = new Path(fs.getHomeDirectory(), ".samoa");
-      return defaultDir.toString();
-    }
-
-    static boolean deleteFileIfExist(String absPath) {
-      Path p = new Path(absPath);
-      return deleteFileIfExist(p);
-    }
-
-    static boolean deleteFileIfExist(Path p) {
-      Configuration config = new Configuration();
-      config.addResource(new Path(coreConfPath));
-      config.addResource(new Path(hdfsConfPath));
-
-      FileSystem fs;
-      try {
-        fs = FileSystem.get(config);
-        if (fs.exists(p)) {
-          return fs.delete(p, false);
-        }
-        else
-          return true;
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-      return false;
-    }
-
-    /*
-     * Write to HDFS
-     */
-    static String writeToHDFS(File file, String dstPath) {
-      Configuration config = new Configuration();
-      config.addResource(new Path(coreConfPath));
-      config.addResource(new Path(hdfsConfPath));
-      logger.info("Filesystem name:{}", config.get("fs.defaultFS"));
-
-      // Default samoaDir
-      if (samoaDir == null) {
-        try {
-          samoaDir = getDefaultSAMOADir();
-        } catch (IOException e) {
-          e.printStackTrace();
-          return null;
-        }
-      }
-
-      // Setup src and dst paths
-      // java.nio.file.Path tempPath =
-      // FileSystems.getDefault().getPath(samoaDir, dstPath);
-      Path dst = new Path(samoaDir, dstPath);
-      Path src = new Path(file.getAbsolutePath());
-
-      // Delete file if already exists in HDFS
-      if (deleteFileIfExist(dst) == false)
-        return null;
-
-      // Copy to HDFS
-      FileSystem fs;
-      try {
-        fs = FileSystem.get(config);
-        fs.copyFromLocalFile(src, dst);
-      } catch (IOException e) {
-        e.printStackTrace();
-        return null;
-      }
-
-      return dst.toString(); // abs path to file
-    }
-
-    /*
-     * Read from HDFS
-     */
-    static Object deserializeObjectFromFile(String filePath) {
-      logger.info("Deserialize HDFS file:{}", filePath);
-      Configuration config = new Configuration();
-      config.addResource(new Path(coreConfPath));
-      config.addResource(new Path(hdfsConfPath));
-
-      Path file = new Path(filePath);
-      FSDataInputStream dataInputStream = null;
-      ObjectInputStream ois = null;
-      Object obj = null;
-      FileSystem fs;
-      try {
-        fs = FileSystem.get(config);
-        dataInputStream = fs.open(file);
-        ois = new ObjectInputStream(dataInputStream);
-        obj = ois.readObject();
-      } catch (IOException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      } catch (ClassNotFoundException e) {
-        try {
-          if (dataInputStream != null)
-            dataInputStream.close();
-          if (ois != null)
-            ois.close();
-        } catch (IOException ioException) {
-          // TODO auto-generated catch block
-          e.printStackTrace();
-        }
-      }
-
-      return obj;
-    }
-
-  }
-
-  private static class LocalFileSystemUtils {
-    static boolean serializObjectToFile(Object obj, String fn) {
-      FileOutputStream fos = null;
-      ObjectOutputStream oos = null;
-      try {
-        fos = new FileOutputStream(fn);
-        oos = new ObjectOutputStream(fos);
-        oos.writeObject(obj);
-      } catch (FileNotFoundException e) {
-        e.printStackTrace();
-        return false;
-      } catch (IOException e) {
-        e.printStackTrace();
-        return false;
-      } finally {
-        try {
-          if (fos != null)
-            fos.close();
-          if (oos != null)
-            oos.close();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-
-      return true;
-    }
-
-    static Object deserializeObjectFromLocalFile(String filename) {
-      logger.info("Deserialize local file:{}", filename);
-      FileInputStream fis = null;
-      ObjectInputStream ois = null;
-      Object obj = null;
-      try {
-        fis = new FileInputStream(filename);
-        ois = new ObjectInputStream(fis);
-        obj = ois.readObject();
-      } catch (IOException e) {
-        // TODO auto-generated catch block
-        e.printStackTrace();
-      } catch (ClassNotFoundException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      } finally {
-        try {
-          if (fis != null)
-            fis.close();
-          if (ois != null)
-            ois.close();
-        } catch (IOException e) {
-          // TODO auto-generated catch block
-          e.printStackTrace();
-        }
-      }
-
-      return obj;
-    }
-  }
-
-  /*
-   * Create streams
-   */
-  public static void createKafkaTopic(String name, int partitions) {
-    createKafkaTopic(name, partitions, 1);
-  }
-
-  public static void createKafkaTopic(String name, int partitions, int 
replicas) {
-    KafkaUtils.createKafkaTopic(name, partitions, replicas);
-  }
-
-  /*
-   * Serialize object
-   */
-  public static boolean serializeObjectToLocalFileSystem(Object object, String 
path) {
-    return LocalFileSystemUtils.serializObjectToFile(object, path);
-  }
-
-  public static String serializeObjectToHDFS(Object object, String path) {
-    File tmpDatFile;
-    try {
-      tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX);
-      if (serializeObjectToLocalFileSystem(object, 
tmpDatFile.getAbsolutePath())) {
-        return HDFSUtils.writeToHDFS(tmpDatFile, path);
-      }
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    }
-    return null;
-  }
-
-  /*
-   * Deserialize object
-   */
-  @SuppressWarnings("unchecked")
-  public static Map<String, Object> deserializeMapFromFile(String filesystem, 
String filename) {
-    Map<String, Object> map;
-    if (filesystem.equals(HDFS)) {
-      map = (Map<String, Object>) 
HDFSUtils.deserializeObjectFromFile(filename);
-    }
-    else {
-      map = (Map<String, Object>) 
LocalFileSystemUtils.deserializeObjectFromLocalFile(filename);
-    }
-    return map;
-  }
-
-  public static Object deserializeObjectFromFileAndKey(String filesystem, 
String filename, String key) {
-    Map<String, Object> map = deserializeMapFromFile(filesystem, filename);
-    if (map == null)
-      return null;
-    return map.get(key);
-  }
-
-  /*
-   * Setup
-   */
-  public static void setZookeeper(String zookeeper) {
-    KafkaUtils.setZookeeper(zookeeper);
-  }
-
-  public static void setHadoopConfigHome(String hadoopHome) {
-    HDFSUtils.setHadoopConfigHome(hadoopHome);
-  }
-
-  public static void setSAMOADir(String samoaDir) {
-    HDFSUtils.setSAMOADir(samoaDir);
-  }
-
-  /*
-   * Others
-   */
-  public static String getHDFSNameNodeUri() {
-    return HDFSUtils.getNameNodeUri();
-  }
-
-  public static String getHadoopConfigHome() {
-    return HDFSUtils.getHadoopConfigHome();
-  }
-
-  public static String copyToHDFS(File file, String dstPath) {
-    return HDFSUtils.writeToHDFS(file, dstPath);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java 
b/samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java
new file mode 100644
index 0000000..3fe3a25
--- /dev/null
+++ b/samoa-samza/src/main/java/org/apache/samoa/SamzaDoTask.java
@@ -0,0 +1,226 @@
+package org.apache.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.impl.SamzaComponentFactory;
+import org.apache.samoa.topology.impl.SamzaEngine;
+import org.apache.samoa.topology.impl.SamzaTopology;
+import org.apache.samoa.utils.SystemsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+
+/**
+ * Main class to run the task on Samza
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaDoTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SamzaDoTask.class);
+
+  private static final String LOCAL_MODE = "local";
+  private static final String YARN_MODE = "yarn";
+
+  // FLAGS
+  private static final String YARN_CONF_FLAG = "--yarn_home";
+  private static final String MODE_FLAG = "--mode";
+  private static final String ZK_FLAG = "--zookeeper";
+  private static final String KAFKA_FLAG = "--kafka";
+  private static final String KAFKA_REPLICATION_FLAG = 
"--kafka_replication_factor";
+  private static final String CHECKPOINT_FREQ_FLAG = "--checkpoint_frequency";
+  private static final String JAR_PACKAGE_FLAG = "--jar_package";
+  private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir";
+  private static final String AM_MEMORY_FLAG = "--yarn_am_mem";
+  private static final String CONTAINER_MEMORY_FLAG = "--yarn_container_mem";
+  private static final String PI_PER_CONTAINER_FLAG = "--pi_per_container";
+
+  private static final String KRYO_REGISTER_FLAG = "--kryo_register";
+
+  // config values
+  private static int kafkaReplicationFactor = 1;
+  private static int checkpointFrequency = 60000;
+  private static String kafka = "localhost:9092";
+  private static String zookeeper = "localhost:2181";
+  private static boolean isLocal = true;
+  private static String yarnConfHome = null;
+  private static String samoaHDFSDir = null;
+  private static String jarPackagePath = null;
+  private static int amMem = 1024;
+  private static int containerMem = 1024;
+  private static int piPerContainer = 2;
+  private static String kryoRegisterFile = null;
+
+  /*
+   * 1. Read arguments 2. Construct topology/task 3. Upload the JAR to HDFS if
+   * we are running on YARN 4. Submit topology to SamzaEngine
+   */
+  public static void main(String[] args) {
+    // Read arguments
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+    parseArguments(tmpArgs);
+
+    args = tmpArgs.toArray(new String[0]);
+
+    // Init Task
+    StringBuilder cliString = new StringBuilder();
+    for (int i = 0; i < args.length; i++) {
+      cliString.append(" ").append(args[i]);
+    }
+    logger.debug("Command line string = {}", cliString.toString());
+    System.out.println("Command line string = " + cliString.toString());
+
+    Task task = null;
+    try {
+      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
+      logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
+    } catch (Exception e) {
+      logger.error("Fail to initialize the task", e);
+      System.out.println("Fail to initialize the task" + e);
+      return;
+    }
+    task.setFactory(new SamzaComponentFactory());
+    task.init();
+
+    // Upload JAR file to HDFS
+    String hdfsPath = null;
+    if (!isLocal) {
+      Path path = FileSystems.getDefault().getPath(jarPackagePath);
+      hdfsPath = uploadJarToHDFS(path.toFile());
+      if (hdfsPath == null) {
+        System.out.println("Fail uploading JAR file \"" + 
path.toAbsolutePath().toString() + "\" to HDFS.");
+        return;
+      }
+    }
+
+    // Set parameters
+    SamzaEngine.getEngine()
+        .setLocalMode(isLocal)
+        .setZooKeeper(zookeeper)
+        .setKafka(kafka)
+        .setYarnPackage(hdfsPath)
+        .setKafkaReplicationFactor(kafkaReplicationFactor)
+        .setConfigHome(yarnConfHome)
+        .setAMMemory(amMem)
+        .setContainerMemory(containerMem)
+        .setPiPerContainerRatio(piPerContainer)
+        .setKryoRegisterFile(kryoRegisterFile)
+        .setCheckpointFrequency(checkpointFrequency);
+
+    // Submit topology
+    SamzaEngine.submitTopology((SamzaTopology) task.getTopology());
+
+  }
+
+  private static boolean isLocalMode(String mode) {
+    return mode.equals(LOCAL_MODE);
+  }
+
+  private static void parseArguments(List<String> args) {
+    for (int i = args.size() - 1; i >= 0; i--) {
+      String arg = args.get(i).trim();
+      String[] splitted = arg.split("=", 2);
+
+      if (splitted.length >= 2) {
+        // YARN config folder which contains conf/core-site.xml,
+        // conf/hdfs-site.xml, conf/yarn-site.xml
+        if (splitted[0].equals(YARN_CONF_FLAG)) {
+          yarnConfHome = splitted[1];
+          args.remove(i);
+        }
+        // host:port for zookeeper cluster
+        else if (splitted[0].equals(ZK_FLAG)) {
+          zookeeper = splitted[1];
+          args.remove(i);
+        }
+        // host:port,... for kafka broker(s)
+        else if (splitted[0].equals(KAFKA_FLAG)) {
+          kafka = splitted[1];
+          args.remove(i);
+        }
+        // whether we are running Samza in Local mode or YARN mode
+        else if (splitted[0].equals(MODE_FLAG)) {
+          isLocal = isLocalMode(splitted[1]);
+          args.remove(i);
+        }
+        // memory requirement for YARN application master
+        else if (splitted[0].equals(AM_MEMORY_FLAG)) {
+          amMem = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // memory requirement for YARN worker container
+        else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) {
+          containerMem = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // the path to JAR archive that we need to upload to HDFS
+        else if (splitted[0].equals(JAR_PACKAGE_FLAG)) {
+          jarPackagePath = splitted[1];
+          args.remove(i);
+        }
+        // the HDFS dir for SAMOA files
+        else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) {
+          samoaHDFSDir = splitted[1];
+          if (samoaHDFSDir.length() < 1)
+            samoaHDFSDir = null;
+          args.remove(i);
+        }
+        // number of max PI instances per container
+        // this will be used to compute the number of containers
+        // AM will request for the job
+        else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) {
+          piPerContainer = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // kafka streams replication factor
+        else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) {
+          kafkaReplicationFactor = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // checkpoint frequency in ms
+        else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) {
+          checkpointFrequency = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // the file contains registration information for Kryo serializer
+        else if (splitted[0].equals(KRYO_REGISTER_FLAG)) {
+          kryoRegisterFile = splitted[1];
+          args.remove(i);
+        }
+      }
+    }
+  }
+
+  private static String uploadJarToHDFS(File file) {
+    SystemsUtils.setHadoopConfigHome(yarnConfHome);
+    SystemsUtils.setSAMOADir(samoaHDFSDir);
+    return SystemsUtils.copyToHDFS(file, file.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamoaSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamoaSystemFactory.java
 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamoaSystemFactory.java
new file mode 100644
index 0000000..7d4f0a1
--- /dev/null
+++ 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamoaSystemFactory.java
@@ -0,0 +1,54 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import 
org.apache.samoa.topology.impl.SamzaEntranceProcessingItem.SamoaSystemConsumer;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+
+/**
+ * Implementation of Samza's SystemFactory Samza will use this factory to get 
our custom consumer which gets the events
+ * from SAMOA EntranceProcessor and feed them to EntranceProcessingItem task
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamoaSystemFactory implements SystemFactory {
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new SinglePartitionWithoutOffsetsSystemAdmin();
+  }
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
+    return new SamoaSystemConsumer(systemName, config);
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
+    throw new SamzaException("This implementation is not supposed to produce 
anything.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaComponentFactory.java
 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaComponentFactory.java
new file mode 100644
index 0000000..5eb9de9
--- /dev/null
+++ 
b/samoa-samza/src/main/java/org/apache/samoa/topology/impl/SamzaComponentFactory.java
@@ -0,0 +1,62 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+/**
+ * Implementation of SAMOA ComponentFactory for Samza
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaComponentFactory implements ComponentFactory {
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
+
+  @Override
+  public ProcessingItem createPi(Processor processor, int parallelism) {
+    return new SamzaProcessingItem(processor, parallelism);
+  }
+
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+    return new SamzaEntranceProcessingItem(entranceProcessor);
+  }
+
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    return new SamzaStream(sourcePi);
+  }
+
+  @Override
+  public Topology createTopology(String topoName) {
+    return new SamzaTopology(topoName);
+  }
+}

Reply via email to