http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
index c1bf5a2..a855e46 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
@@ -39,210 +39,209 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
  * 
  * @author Anh Thu Vu
  */
-public class SamzaStream extends AbstractStream implements Serializable  {
+public class SamzaStream extends AbstractStream implements Serializable {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 1L;
-
-       private static final String DEFAULT_SYSTEM_NAME = "kafka";
-       
-       private List<SamzaSystemStream> systemStreams;
-       private transient MessageCollector collector;
-       private String systemName;
-       
-       /*
-        * Constructor
-        */
-       public SamzaStream(IProcessingItem sourcePi) {
-               super(sourcePi);
-               this.systemName = DEFAULT_SYSTEM_NAME;
-               // Get name/id for this stream
-               SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi;
-               int index = samzaPi.addOutputStream(this);
-               this.setStreamId(samzaPi.getName()+"-"+Integer.toString(index));
-               // init list of SamzaSystemStream
-               systemStreams = new ArrayList<SamzaSystemStream>();
-       }
-       
-       /*
-        * System name (Kafka)
-        */
-       public void setSystemName(String systemName) {
-               this.systemName = systemName;
-               for (SamzaSystemStream systemStream:systemStreams) {
-                       systemStream.setSystem(systemName);
-               }
-       }
-
-       public String getSystemName() {
-               return this.systemName;
-       }
-
-       /*
-        * Add the PI to the list of destinations. 
-        * Return the name of the corresponding SystemStream.
-        */
-       public SamzaSystemStream addDestination(StreamDestination destination) {
-               PartitioningScheme scheme = destination.getPartitioningScheme();
-               int parallelism = destination.getParallelism();
-               
-               SamzaSystemStream resultStream = null;
-               for (int i=0; i<systemStreams.size(); i++) {
-                       // There is an existing SystemStream that matches the 
settings. 
-                       // Do not create a new one
-                       if (systemStreams.get(i).isSame(scheme, parallelism)) {
-                               resultStream = systemStreams.get(i);
-                       }
-               }
-               
-               // No existing SystemStream match the requirement
-               // Create a new one 
-               if (resultStream == null) {
-                       String topicName = this.getStreamId() + "-" + 
Integer.toString(systemStreams.size());
-                       resultStream = new 
SamzaSystemStream(this.systemName,topicName,scheme,parallelism);
-                       systemStreams.add(resultStream);
-               }
-               
-               return resultStream;
-       }
-       
-       public void setCollector(MessageCollector collector) {
-               this.collector = collector;
-       }
-       
-       public MessageCollector getCollector(){
-               return this.collector;
-       }
-       
-       public void onCreate() {
-               for (SamzaSystemStream stream:systemStreams) {
-                       stream.initSystemStream();
-               }
-       }
-       
-       /*
-        * Implement Stream interface
-        */
-       @Override
-       public void put(ContentEvent event) {
-               for (SamzaSystemStream stream:systemStreams) {
-                       stream.send(collector,event);
-               }
-       }
-       
-       public List<SamzaSystemStream> getSystemStreams() {
-               return this.systemStreams;
-       }
-       
-       /**
-        * SamzaSystemStream wrap around a Samza's SystemStream 
-        * It contains the info to create a Samza stream during the
-        * constructing process of the topology and
-        * will create the actual Samza stream when the topology is submitted
-        * (invoking initSystemStream())
-        * 
-        * @author Anh Thu Vu
-        */
-       public static class SamzaSystemStream implements Serializable {
-               /**
+  private static final long serialVersionUID = 1L;
+
+  private static final String DEFAULT_SYSTEM_NAME = "kafka";
+
+  private List<SamzaSystemStream> systemStreams;
+  private transient MessageCollector collector;
+  private String systemName;
+
+  /*
+   * Constructor
+   */
+  public SamzaStream(IProcessingItem sourcePi) {
+    super(sourcePi);
+    this.systemName = DEFAULT_SYSTEM_NAME;
+    // Get name/id for this stream
+    SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi;
+    int index = samzaPi.addOutputStream(this);
+    this.setStreamId(samzaPi.getName() + "-" + Integer.toString(index));
+    // init list of SamzaSystemStream
+    systemStreams = new ArrayList<SamzaSystemStream>();
+  }
+
+  /*
+   * System name (Kafka)
+   */
+  public void setSystemName(String systemName) {
+    this.systemName = systemName;
+    for (SamzaSystemStream systemStream : systemStreams) {
+      systemStream.setSystem(systemName);
+    }
+  }
+
+  public String getSystemName() {
+    return this.systemName;
+  }
+
+  /*
+   * Add the PI to the list of destinations. Return the name of the
+   * corresponding SystemStream.
+   */
+  public SamzaSystemStream addDestination(StreamDestination destination) {
+    PartitioningScheme scheme = destination.getPartitioningScheme();
+    int parallelism = destination.getParallelism();
+
+    SamzaSystemStream resultStream = null;
+    for (int i = 0; i < systemStreams.size(); i++) {
+      // There is an existing SystemStream that matches the settings.
+      // Do not create a new one
+      if (systemStreams.get(i).isSame(scheme, parallelism)) {
+        resultStream = systemStreams.get(i);
+      }
+    }
+
+    // No existing SystemStream match the requirement
+    // Create a new one
+    if (resultStream == null) {
+      String topicName = this.getStreamId() + "-" + 
Integer.toString(systemStreams.size());
+      resultStream = new SamzaSystemStream(this.systemName, topicName, scheme, 
parallelism);
+      systemStreams.add(resultStream);
+    }
+
+    return resultStream;
+  }
+
+  public void setCollector(MessageCollector collector) {
+    this.collector = collector;
+  }
+
+  public MessageCollector getCollector() {
+    return this.collector;
+  }
+
+  public void onCreate() {
+    for (SamzaSystemStream stream : systemStreams) {
+      stream.initSystemStream();
+    }
+  }
+
+  /*
+   * Implement Stream interface
+   */
+  @Override
+  public void put(ContentEvent event) {
+    for (SamzaSystemStream stream : systemStreams) {
+      stream.send(collector, event);
+    }
+  }
+
+  public List<SamzaSystemStream> getSystemStreams() {
+    return this.systemStreams;
+  }
+
+  /**
+   * SamzaSystemStream wrap around a Samza's SystemStream It contains the info
+   * to create a Samza stream during the constructing process of the topology
+   * and will create the actual Samza stream when the topology is submitted
+   * (invoking initSystemStream())
+   * 
+   * @author Anh Thu Vu
+   */
+  public static class SamzaSystemStream implements Serializable {
+    /**
                 * 
                 */
-               private static final long serialVersionUID = 1L;
-               private String system;
-               private String stream;
-               private PartitioningScheme scheme;
-               private int parallelism;
-               
-               private transient SystemStream actualSystemStream = null;
-               
-               /*
-                * Constructors
-                */
-               public SamzaSystemStream(String system, String stream, 
PartitioningScheme scheme, int parallelism) {
-                       this.system = system;
-                       this.stream = stream;
-                       this.scheme = scheme;
-                       this.parallelism = parallelism;
-               }
-               
-               public SamzaSystemStream(String system, String stream, 
PartitioningScheme scheme) {
-                       this(system, stream, scheme, 1);
-               }
-               
-               /*
-                * Setters
-                */
-               public void setSystem(String system) {
-                       this.system = system;
-               }
-               
-               /*
-                * Getters
-                */
-               public String getSystem() {
-                       return this.system;
-               }
-               
-               public String getStream() {
-                       return this.stream;
-               }
-               
-               public PartitioningScheme getPartitioningScheme() {
-                       return this.scheme;
-               }
-               
-               public int getParallelism() {
-                       return this.parallelism;
-               }
-
-               public boolean isSame(PartitioningScheme scheme, int 
parallelismHint) {
-                       return (this.scheme == scheme && this.parallelism == 
parallelismHint);
-               }
-               
-               /*
-                * Init the actual Samza stream
-                */
-               public void initSystemStream() {
-                       actualSystemStream = new SystemStream(this.system, 
this.stream);
-               }
-               
-               /*
-                * Send a ContentEvent
-                */
-               public void send(MessageCollector collector, ContentEvent 
contentEvent) {
-                       if (actualSystemStream == null) 
-                               this.initSystemStream();
-                       
-                       switch(this.scheme) {
-                       case SHUFFLE:
-                               this.sendShuffle(collector, contentEvent);
-                               break;
-                       case GROUP_BY_KEY:
-                               this.sendGroupByKey(collector, contentEvent);
-                               break;
-                       case BROADCAST:
-                               this.sendBroadcast(collector, contentEvent);
-                               break;
-                       }
-               }
-               
-               /*
-                * Helpers
-                */
-               private synchronized void sendShuffle(MessageCollector 
collector, ContentEvent event) {
-                       collector.send(new 
OutgoingMessageEnvelope(this.actualSystemStream, event));
-               }
-               
-               private void sendGroupByKey(MessageCollector collector, 
ContentEvent event) {
-                       collector.send(new 
OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event));
-               }
-
-               private synchronized void sendBroadcast(MessageCollector 
collector, ContentEvent event) {
-                       for (int i=0; i<parallelism; i++) {
-                               collector.send(new 
OutgoingMessageEnvelope(this.actualSystemStream, i, null, event));
-                       }
-               }       
-       }
+    private static final long serialVersionUID = 1L;
+    private String system;
+    private String stream;
+    private PartitioningScheme scheme;
+    private int parallelism;
+
+    private transient SystemStream actualSystemStream = null;
+
+    /*
+     * Constructors
+     */
+    public SamzaSystemStream(String system, String stream, PartitioningScheme 
scheme, int parallelism) {
+      this.system = system;
+      this.stream = stream;
+      this.scheme = scheme;
+      this.parallelism = parallelism;
+    }
+
+    public SamzaSystemStream(String system, String stream, PartitioningScheme 
scheme) {
+      this(system, stream, scheme, 1);
+    }
+
+    /*
+     * Setters
+     */
+    public void setSystem(String system) {
+      this.system = system;
+    }
+
+    /*
+     * Getters
+     */
+    public String getSystem() {
+      return this.system;
+    }
+
+    public String getStream() {
+      return this.stream;
+    }
+
+    public PartitioningScheme getPartitioningScheme() {
+      return this.scheme;
+    }
+
+    public int getParallelism() {
+      return this.parallelism;
+    }
+
+    public boolean isSame(PartitioningScheme scheme, int parallelismHint) {
+      return (this.scheme == scheme && this.parallelism == parallelismHint);
+    }
+
+    /*
+     * Init the actual Samza stream
+     */
+    public void initSystemStream() {
+      actualSystemStream = new SystemStream(this.system, this.stream);
+    }
+
+    /*
+     * Send a ContentEvent
+     */
+    public void send(MessageCollector collector, ContentEvent contentEvent) {
+      if (actualSystemStream == null)
+        this.initSystemStream();
+
+      switch (this.scheme) {
+      case SHUFFLE:
+        this.sendShuffle(collector, contentEvent);
+        break;
+      case GROUP_BY_KEY:
+        this.sendGroupByKey(collector, contentEvent);
+        break;
+      case BROADCAST:
+        this.sendBroadcast(collector, contentEvent);
+        break;
+      }
+    }
+
+    /*
+     * Helpers
+     */
+    private synchronized void sendShuffle(MessageCollector collector, 
ContentEvent event) {
+      collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, 
event));
+    }
+
+    private void sendGroupByKey(MessageCollector collector, ContentEvent 
event) {
+      collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, 
event.getKey(), null, event));
+    }
+
+    private synchronized void sendBroadcast(MessageCollector collector, 
ContentEvent event) {
+      for (int i = 0; i < parallelism; i++) {
+        collector.send(new OutgoingMessageEnvelope(this.actualSystemStream, i, 
null, event));
+      }
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
index a169bc2..4e52966 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
@@ -32,33 +32,33 @@ import com.yahoo.labs.samoa.topology.AbstractTopology;
  * @author Anh Thu Vu
  */
 public class SamzaTopology extends AbstractTopology {
-       private int procItemCounter;
-       
-       public SamzaTopology(String topoName) {
-               super(topoName);
-               procItemCounter = 0;
-       }
-       
-       @Override
-       public void addProcessingItem(IProcessingItem procItem, int 
parallelism) {
-               super.addProcessingItem(procItem, parallelism);
-               SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem;
-               
samzaPi.setName(this.getTopologyName()+"-"+Integer.toString(procItemCounter));
-               procItemCounter++;
-       }
-       
-       /*
-        * Gets the set of ProcessingItems, excluding EntrancePIs
-        * Used by SamzaConfigFactory as the config for EntrancePIs and
-        * normal PIs are different
-        */
-       public Set<IProcessingItem> getNonEntranceProcessingItems() throws 
Exception {
-               Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>();
-               copiedSet.addAll(this.getProcessingItems());
-               boolean result = 
copiedSet.removeAll(this.getEntranceProcessingItems());
-               if (!result) {
-                       throw new Exception("Failed extracting the set of 
non-entrance processing items");
-               }
-               return copiedSet;
-       }
+  private int procItemCounter;
+
+  public SamzaTopology(String topoName) {
+    super(topoName);
+    procItemCounter = 0;
+  }
+
+  @Override
+  public void addProcessingItem(IProcessingItem procItem, int parallelism) {
+    super.addProcessingItem(procItem, parallelism);
+    SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem;
+    samzaPi.setName(this.getTopologyName() + "-" + 
Integer.toString(procItemCounter));
+    procItemCounter++;
+  }
+
+  /*
+   * Gets the set of ProcessingItems, excluding EntrancePIs Used by
+   * SamzaConfigFactory as the config for EntrancePIs and normal PIs are
+   * different
+   */
+  public Set<IProcessingItem> getNonEntranceProcessingItems() throws Exception 
{
+    Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>();
+    copiedSet.addAll(this.getProcessingItems());
+    boolean result = copiedSet.removeAll(this.getEntranceProcessingItems());
+    if (!result) {
+      throw new Exception("Failed extracting the set of non-entrance 
processing items");
+    }
+    return copiedSet;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
index 56427d0..03f35f1 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
@@ -51,482 +51,489 @@ import com.yahoo.labs.samoa.topology.impl.SamzaTopology;
 import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream;
 
 /**
- * Generate Configs that will be used to submit Samza jobs
- * from the input topology (one config per PI/EntrancePI in 
- * the topology)
+ * Generate Configs that will be used to submit Samza jobs from the input
+ * topology (one config per PI/EntrancePI in the topology)
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class SamzaConfigFactory {
-       public static final String SYSTEM_NAME = "samoa";
-       
-       // DEFAULT VALUES
-       private static final String DEFAULT_ZOOKEEPER = "localhost:2181";
-       private static final String DEFAULT_BROKER_LIST = "localhost:9092";
-
-       // DELIMINATORS
-       public static final String COMMA = ",";
-       public static final String COLON = ":";
-       public static final String DOT = ".";
-       public static final char DOLLAR_SIGN = '$';
-       public static final char QUESTION_MARK = '?';
-       
-       // PARTITIONING SCHEMES
-       public static final String SHUFFLE = "shuffle";
-       public static final String KEY = "key";
-       public static final String BROADCAST = "broadcast";
-
-       // PROPERTY KEYS
-       // JOB
-       public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class";
-       public static final String JOB_NAME_KEY = "job.name";
-       // YARN 
-       public static final String YARN_PACKAGE_KEY = "yarn.package.path";
-       public static final String CONTAINER_MEMORY_KEY = 
"yarn.container.memory.mb";
-       public static final String AM_MEMORY_KEY = 
"yarn.am.container.memory.mb";
-       public static final String CONTAINER_COUNT_KEY = "yarn.container.count";
-       // TASK (SAMZA original)
-       public static final String TASK_CLASS_KEY = "task.class";
-       public static final String TASK_INPUTS_KEY = "task.inputs";
-       // TASK (extra)
-       public static final String FILE_KEY = "task.processor.file";
-       public static final String FILESYSTEM_KEY = "task.processor.filesystem";
-       public static final String ENTRANCE_INPUT_KEY = "task.entrance.input";
-       public static final String ENTRANCE_OUTPUT_KEY = 
"task.entrance.outputs";
-       public static final String YARN_CONF_HOME_KEY = "yarn.config.home";
-       // KAFKA
-       public static final String ZOOKEEPER_URI_KEY = 
"consumer.zookeeper.connect";
-       public static final String BROKER_URI_KEY = 
"producer.metadata.broker.list";
-       public static final String KAFKA_BATCHSIZE_KEY = 
"producer.batch.num.messages";
-       public static final String KAFKA_PRODUCER_TYPE_KEY = 
"producer.producer.type";
-       // SERDE
-       public static final String SERDE_REGISTRATION_KEY = "kryo.register";
-
-       // Instance variables
-       private boolean isLocalMode;
-       private String zookeeper;
-       private String kafkaBrokerList;
-       private int replicationFactor;
-       private int amMemory;
-       private int containerMemory;
-       private int piPerContainerRatio;
-       private int checkpointFrequency; // in ms
-
-       private String jarPath;
-       private String kryoRegisterFile = null;
-
-       public SamzaConfigFactory() {
-               this.isLocalMode = false;
-               this.zookeeper = DEFAULT_ZOOKEEPER;
-               this.kafkaBrokerList = DEFAULT_BROKER_LIST;
-               this.checkpointFrequency = 60000; // default: 1 minute
-               this.replicationFactor = 1;
-       }
-
-       /*
-        * Setter methods
-        */
-       public SamzaConfigFactory setYarnPackage(String packagePath) {
-               this.jarPath = packagePath;
-               return this;
-       }
-
-       public SamzaConfigFactory setLocalMode(boolean isLocal) {
-               this.isLocalMode = isLocal;
-               return this;
-       }
-
-       public SamzaConfigFactory setZookeeper(String zk) {
-               this.zookeeper = zk;
-               return this;
-       }
-
-       public SamzaConfigFactory setKafka(String brokerList) {
-               this.kafkaBrokerList = brokerList;
-               return this;
-       }
-       
-       public SamzaConfigFactory setCheckpointFrequency(int freq) {
-               this.checkpointFrequency = freq;
-               return this;
-       }
-       
-       public SamzaConfigFactory setReplicationFactor(int replicationFactor) {
-               this.replicationFactor = replicationFactor;
-               return this;
-       }
-
-       public SamzaConfigFactory setAMMemory(int mem) {
-               this.amMemory = mem;
-               return this;
-       }
-
-       public SamzaConfigFactory setContainerMemory(int mem) {
-               this.containerMemory = mem;
-               return this;
-       }
-       
-       public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) {
-               this.piPerContainerRatio = piPerContainer;
-               return this;
-       }
-
-       public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) {
-               this.kryoRegisterFile = kryoRegister;
-               return this;
-       }
-
-       /*
-        * Generate a map of all config properties for the input 
SamzaProcessingItem
-        */
-       private Map<String,String> getMapForPI(SamzaProcessingItem pi, String 
filename, String filesystem) throws Exception {
-               Map<String,String> map = getBasicSystemConfig();
-
-               // Set job name, task class, task inputs (from 
SamzaProcessingItem)
-               setJobName(map, pi.getName());
-               setTaskClass(map, SamzaProcessingItem.class.getName());
-
-               StringBuilder streamNames = new StringBuilder();
-               boolean first = true;
-               for(SamzaSystemStream stream:pi.getInputStreams()) {
-                       if (!first) streamNames.append(COMMA);
-                       
streamNames.append(stream.getSystem()+DOT+stream.getStream());
-                       if (first) first = false;
-               }
-               setTaskInputs(map, streamNames.toString());
-
-               // Processor file
-               setFileName(map, filename);
-               setFileSystem(map, filesystem);
-               
-               List<String> nameList = new ArrayList<String>();
-               // Default kafka system: kafka0: sync producer
-               // This system is always required: it is used for checkpointing
-               nameList.add("kafka0");
-               setKafkaSystem(map, "kafka0", this.zookeeper, 
this.kafkaBrokerList, 1);
-               // Output streams: set kafka systems
-               for (SamzaStream stream:pi.getOutputStreams()) {
-                       boolean found = false;
-                       for (String name:nameList) {
-                               if (stream.getSystemName().equals(name)) {
-                                       found = true;
-                                       break;
-                               }
-                       }
-                       if (!found) {
-                               nameList.add(stream.getSystemName());
-                               setKafkaSystem(map, stream.getSystemName(), 
this.zookeeper, this.kafkaBrokerList, stream.getBatchSize());
-                       }
-               }
-               // Input streams: set kafka systems
-               for (SamzaSystemStream stream:pi.getInputStreams()) {
-                       boolean found = false;
-                       for (String name:nameList) {
-                               if (stream.getSystem().equals(name)) {
-                                       found = true;
-                                       break;
-                               }
-                       }
-                       if (!found) {
-                               nameList.add(stream.getSystem());
-                               setKafkaSystem(map, stream.getSystem(), 
this.zookeeper, this.kafkaBrokerList, 1);
-                       }
-               }
-               
-               // Checkpointing
-               
setValue(map,"task.checkpoint.factory","org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
-               setValue(map,"task.checkpoint.system","kafka0");
-               setValue(map,"task.commit.ms","1000");
-               
setValue(map,"task.checkpoint.replication.factor",Integer.toString(this.replicationFactor));
-               
-               // Number of containers
-               setNumberOfContainers(map, pi.getParallelism(), 
this.piPerContainerRatio);
-
-               return map;
-       }
-
-       /*
-        * Generate a map of all config properties for the input 
SamzaProcessingItem
-        */
-       public Map<String,String> 
getMapForEntrancePI(SamzaEntranceProcessingItem epi, String filename, String 
filesystem) {
-               Map<String,String> map = getBasicSystemConfig();
-
-               // Set job name, task class (from SamzaEntranceProcessingItem)
-               setJobName(map, epi.getName());
-               setTaskClass(map, SamzaEntranceProcessingItem.class.getName());
-
-               // Input for the entrance task (from our custom consumer)
-               setTaskInputs(map, SYSTEM_NAME+"."+epi.getName());
-
-               // Output from entrance task
-               // Since entrancePI should have only 1 output stream
-               // there is no need for checking the batch size, setting 
different system names
-               // The custom consumer (samoa system) does not suuport reading 
from a specific index
-               // => no need for checkpointing
-               SamzaStream outputStream = (SamzaStream)epi.getOutputStream();
-               // Set samoa system factory
-               setValue(map, "systems."+SYSTEM_NAME+".samza.factory", 
SamoaSystemFactory.class.getName());
-               // Set Kafka system (only if there is an output stream)
-               if (outputStream != null)
-                       setKafkaSystem(map, outputStream.getSystemName(), 
this.zookeeper, this.kafkaBrokerList, outputStream.getBatchSize());
-
-               // Processor file
-               setFileName(map, filename);
-               setFileSystem(map, filesystem);
-               
-               // Number of containers
-               setNumberOfContainers(map, 1, this.piPerContainerRatio);
-
-               return map;
-       }
-
-       /*
-        * Generate a list of map (of config properties) for all PIs and EPI in 
-        * the input topology
-        */
-       public List<Map<String,String>> getMapsForTopology(SamzaTopology 
topology) throws Exception {
-
-               List<Map<String,String>> maps = new 
ArrayList<Map<String,String>>();
-
-               // File to write serialized objects
-               String filename = topology.getTopologyName() + ".dat";
-               Path dirPath = FileSystems.getDefault().getPath("dat");
-               Path filePath= 
FileSystems.getDefault().getPath(dirPath.toString(), filename);
-               String dstPath = filePath.toString();
-               String resPath;
-               String filesystem;
-               if (this.isLocalMode) {
-                       filesystem = SystemsUtils.LOCAL_FS;
-                       File dir = dirPath.toFile();
-                       if (!dir.exists()) 
-                               FileUtils.forceMkdir(dir);
-               }
-               else {
-                       filesystem = SystemsUtils.HDFS;
-               }
-
-               // Correct system name for streams
-               this.setSystemNameForStreams(topology.getStreams());
-               
-               // Add all PIs to a collection (map)
-               Map<String,Object> piMap = new HashMap<String,Object>();
-               Set<EntranceProcessingItem> entranceProcessingItems = 
topology.getEntranceProcessingItems();
-               Set<IProcessingItem> processingItems = 
topology.getNonEntranceProcessingItems();
-               for(EntranceProcessingItem epi:entranceProcessingItems) {
-                       SamzaEntranceProcessingItem sepi = 
(SamzaEntranceProcessingItem) epi;
-                       piMap.put(sepi.getName(), sepi);
-               }
-               for(IProcessingItem pi:processingItems) {
-                       SamzaProcessingItem spi = (SamzaProcessingItem) pi;
-                       piMap.put(spi.getName(), spi);
-               }
-
-               // Serialize all PIs
-               boolean serialized = false;
-               if (this.isLocalMode) {
-                       serialized = 
SystemsUtils.serializeObjectToLocalFileSystem(piMap, dstPath);
-                       resPath = dstPath;
-               }
-               else {
-                       resPath = SystemsUtils.serializeObjectToHDFS(piMap, 
dstPath);
-                       serialized = resPath != null;
-               }
-
-               if (!serialized) {
-                       throw new Exception("Fail serialize map of PIs to 
file");
-               }
-
-               // MapConfig for all PIs
-               for(EntranceProcessingItem epi:entranceProcessingItems) {
-                       SamzaEntranceProcessingItem sepi = 
(SamzaEntranceProcessingItem) epi;
-                       maps.add(this.getMapForEntrancePI(sepi, resPath, 
filesystem));
-               }
-               for(IProcessingItem pi:processingItems) {
-                       SamzaProcessingItem spi = (SamzaProcessingItem) pi;
-                       maps.add(this.getMapForPI(spi, resPath, filesystem));
-               }
-
-               return maps;
-       }
-
-       /**
-        * Construct a list of MapConfigs for a Topology
-        * @return the list of MapConfigs
-        * @throws Exception 
-        */
-       public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) 
throws Exception {
-               List<MapConfig> configs = new ArrayList<MapConfig>();
-               List<Map<String,String>> maps = 
this.getMapsForTopology(topology);
-               for(Map<String,String> map:maps) {
-                       configs.add(new MapConfig(map));
-               }
-               return configs;
-       }
-       
-       /*
+  public static final String SYSTEM_NAME = "samoa";
+
+  // DEFAULT VALUES
+  private static final String DEFAULT_ZOOKEEPER = "localhost:2181";
+  private static final String DEFAULT_BROKER_LIST = "localhost:9092";
+
+  // DELIMINATORS
+  public static final String COMMA = ",";
+  public static final String COLON = ":";
+  public static final String DOT = ".";
+  public static final char DOLLAR_SIGN = '$';
+  public static final char QUESTION_MARK = '?';
+
+  // PARTITIONING SCHEMES
+  public static final String SHUFFLE = "shuffle";
+  public static final String KEY = "key";
+  public static final String BROADCAST = "broadcast";
+
+  // PROPERTY KEYS
+  // JOB
+  public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class";
+  public static final String JOB_NAME_KEY = "job.name";
+  // YARN
+  public static final String YARN_PACKAGE_KEY = "yarn.package.path";
+  public static final String CONTAINER_MEMORY_KEY = "yarn.container.memory.mb";
+  public static final String AM_MEMORY_KEY = "yarn.am.container.memory.mb";
+  public static final String CONTAINER_COUNT_KEY = "yarn.container.count";
+  // TASK (SAMZA original)
+  public static final String TASK_CLASS_KEY = "task.class";
+  public static final String TASK_INPUTS_KEY = "task.inputs";
+  // TASK (extra)
+  public static final String FILE_KEY = "task.processor.file";
+  public static final String FILESYSTEM_KEY = "task.processor.filesystem";
+  public static final String ENTRANCE_INPUT_KEY = "task.entrance.input";
+  public static final String ENTRANCE_OUTPUT_KEY = "task.entrance.outputs";
+  public static final String YARN_CONF_HOME_KEY = "yarn.config.home";
+  // KAFKA
+  public static final String ZOOKEEPER_URI_KEY = "consumer.zookeeper.connect";
+  public static final String BROKER_URI_KEY = "producer.metadata.broker.list";
+  public static final String KAFKA_BATCHSIZE_KEY = 
"producer.batch.num.messages";
+  public static final String KAFKA_PRODUCER_TYPE_KEY = 
"producer.producer.type";
+  // SERDE
+  public static final String SERDE_REGISTRATION_KEY = "kryo.register";
+
+  // Instance variables
+  private boolean isLocalMode;
+  private String zookeeper;
+  private String kafkaBrokerList;
+  private int replicationFactor;
+  private int amMemory;
+  private int containerMemory;
+  private int piPerContainerRatio;
+  private int checkpointFrequency; // in ms
+
+  private String jarPath;
+  private String kryoRegisterFile = null;
+
+  public SamzaConfigFactory() {
+    this.isLocalMode = false;
+    this.zookeeper = DEFAULT_ZOOKEEPER;
+    this.kafkaBrokerList = DEFAULT_BROKER_LIST;
+    this.checkpointFrequency = 60000; // default: 1 minute
+    this.replicationFactor = 1;
+  }
+
+  /*
+   * Setter methods
+   */
+  public SamzaConfigFactory setYarnPackage(String packagePath) {
+    this.jarPath = packagePath;
+    return this;
+  }
+
+  public SamzaConfigFactory setLocalMode(boolean isLocal) {
+    this.isLocalMode = isLocal;
+    return this;
+  }
+
+  public SamzaConfigFactory setZookeeper(String zk) {
+    this.zookeeper = zk;
+    return this;
+  }
+
+  public SamzaConfigFactory setKafka(String brokerList) {
+    this.kafkaBrokerList = brokerList;
+    return this;
+  }
+
+  public SamzaConfigFactory setCheckpointFrequency(int freq) {
+    this.checkpointFrequency = freq;
+    return this;
+  }
+
+  public SamzaConfigFactory setReplicationFactor(int replicationFactor) {
+    this.replicationFactor = replicationFactor;
+    return this;
+  }
+
+  public SamzaConfigFactory setAMMemory(int mem) {
+    this.amMemory = mem;
+    return this;
+  }
+
+  public SamzaConfigFactory setContainerMemory(int mem) {
+    this.containerMemory = mem;
+    return this;
+  }
+
+  public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) {
+    this.piPerContainerRatio = piPerContainer;
+    return this;
+  }
+
+  public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) {
+    this.kryoRegisterFile = kryoRegister;
+    return this;
+  }
+
+  /*
+   * Generate a map of all config properties for the input SamzaProcessingItem
+   */
+  private Map<String, String> getMapForPI(SamzaProcessingItem pi, String 
filename, String filesystem) throws Exception {
+    Map<String, String> map = getBasicSystemConfig();
+
+    // Set job name, task class, task inputs (from SamzaProcessingItem)
+    setJobName(map, pi.getName());
+    setTaskClass(map, SamzaProcessingItem.class.getName());
+
+    StringBuilder streamNames = new StringBuilder();
+    boolean first = true;
+    for (SamzaSystemStream stream : pi.getInputStreams()) {
+      if (!first)
+        streamNames.append(COMMA);
+      streamNames.append(stream.getSystem() + DOT + stream.getStream());
+      if (first)
+        first = false;
+    }
+    setTaskInputs(map, streamNames.toString());
+
+    // Processor file
+    setFileName(map, filename);
+    setFileSystem(map, filesystem);
+
+    List<String> nameList = new ArrayList<String>();
+    // Default kafka system: kafka0: sync producer
+    // This system is always required: it is used for checkpointing
+    nameList.add("kafka0");
+    setKafkaSystem(map, "kafka0", this.zookeeper, this.kafkaBrokerList, 1);
+    // Output streams: set kafka systems
+    for (SamzaStream stream : pi.getOutputStreams()) {
+      boolean found = false;
+      for (String name : nameList) {
+        if (stream.getSystemName().equals(name)) {
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        nameList.add(stream.getSystemName());
+        setKafkaSystem(map, stream.getSystemName(), this.zookeeper, 
this.kafkaBrokerList, stream.getBatchSize());
+      }
+    }
+    // Input streams: set kafka systems
+    for (SamzaSystemStream stream : pi.getInputStreams()) {
+      boolean found = false;
+      for (String name : nameList) {
+        if (stream.getSystem().equals(name)) {
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        nameList.add(stream.getSystem());
+        setKafkaSystem(map, stream.getSystem(), this.zookeeper, 
this.kafkaBrokerList, 1);
+      }
+    }
+
+    // Checkpointing
+    setValue(map, "task.checkpoint.factory", 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+    setValue(map, "task.checkpoint.system", "kafka0");
+    setValue(map, "task.commit.ms", "1000");
+    setValue(map, "task.checkpoint.replication.factor", 
Integer.toString(this.replicationFactor));
+
+    // Number of containers
+    setNumberOfContainers(map, pi.getParallelism(), this.piPerContainerRatio);
+
+    return map;
+  }
+
+  /*
+   * Generate a map of all config properties for the input SamzaProcessingItem
+   */
+  public Map<String, String> getMapForEntrancePI(SamzaEntranceProcessingItem 
epi, String filename, String filesystem) {
+    Map<String, String> map = getBasicSystemConfig();
+
+    // Set job name, task class (from SamzaEntranceProcessingItem)
+    setJobName(map, epi.getName());
+    setTaskClass(map, SamzaEntranceProcessingItem.class.getName());
+
+    // Input for the entrance task (from our custom consumer)
+    setTaskInputs(map, SYSTEM_NAME + "." + epi.getName());
+
+    // Output from entrance task
+    // Since entrancePI should have only 1 output stream
+    // there is no need for checking the batch size, setting different system
+    // names
+    // The custom consumer (samoa system) does not suuport reading from a
+    // specific index
+    // => no need for checkpointing
+    SamzaStream outputStream = (SamzaStream) epi.getOutputStream();
+    // Set samoa system factory
+    setValue(map, "systems." + SYSTEM_NAME + ".samza.factory", 
SamoaSystemFactory.class.getName());
+    // Set Kafka system (only if there is an output stream)
+    if (outputStream != null)
+      setKafkaSystem(map, outputStream.getSystemName(), this.zookeeper, 
this.kafkaBrokerList,
+          outputStream.getBatchSize());
+
+    // Processor file
+    setFileName(map, filename);
+    setFileSystem(map, filesystem);
+
+    // Number of containers
+    setNumberOfContainers(map, 1, this.piPerContainerRatio);
+
+    return map;
+  }
+
+  /*
+   * Generate a list of map (of config properties) for all PIs and EPI in the
+   * input topology
+   */
+  public List<Map<String, String>> getMapsForTopology(SamzaTopology topology) 
throws Exception {
+
+    List<Map<String, String>> maps = new ArrayList<Map<String, String>>();
+
+    // File to write serialized objects
+    String filename = topology.getTopologyName() + ".dat";
+    Path dirPath = FileSystems.getDefault().getPath("dat");
+    Path filePath = FileSystems.getDefault().getPath(dirPath.toString(), 
filename);
+    String dstPath = filePath.toString();
+    String resPath;
+    String filesystem;
+    if (this.isLocalMode) {
+      filesystem = SystemsUtils.LOCAL_FS;
+      File dir = dirPath.toFile();
+      if (!dir.exists())
+        FileUtils.forceMkdir(dir);
+    }
+    else {
+      filesystem = SystemsUtils.HDFS;
+    }
+
+    // Correct system name for streams
+    this.setSystemNameForStreams(topology.getStreams());
+
+    // Add all PIs to a collection (map)
+    Map<String, Object> piMap = new HashMap<String, Object>();
+    Set<EntranceProcessingItem> entranceProcessingItems = 
topology.getEntranceProcessingItems();
+    Set<IProcessingItem> processingItems = 
topology.getNonEntranceProcessingItems();
+    for (EntranceProcessingItem epi : entranceProcessingItems) {
+      SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi;
+      piMap.put(sepi.getName(), sepi);
+    }
+    for (IProcessingItem pi : processingItems) {
+      SamzaProcessingItem spi = (SamzaProcessingItem) pi;
+      piMap.put(spi.getName(), spi);
+    }
+
+    // Serialize all PIs
+    boolean serialized = false;
+    if (this.isLocalMode) {
+      serialized = SystemsUtils.serializeObjectToLocalFileSystem(piMap, 
dstPath);
+      resPath = dstPath;
+    }
+    else {
+      resPath = SystemsUtils.serializeObjectToHDFS(piMap, dstPath);
+      serialized = resPath != null;
+    }
+
+    if (!serialized) {
+      throw new Exception("Fail serialize map of PIs to file");
+    }
+
+    // MapConfig for all PIs
+    for (EntranceProcessingItem epi : entranceProcessingItems) {
+      SamzaEntranceProcessingItem sepi = (SamzaEntranceProcessingItem) epi;
+      maps.add(this.getMapForEntrancePI(sepi, resPath, filesystem));
+    }
+    for (IProcessingItem pi : processingItems) {
+      SamzaProcessingItem spi = (SamzaProcessingItem) pi;
+      maps.add(this.getMapForPI(spi, resPath, filesystem));
+    }
+
+    return maps;
+  }
+
+  /**
+   * Construct a list of MapConfigs for a Topology
+   * 
+   * @return the list of MapConfigs
+   * @throws Exception
+   */
+  public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) 
throws Exception {
+    List<MapConfig> configs = new ArrayList<MapConfig>();
+    List<Map<String, String>> maps = this.getMapsForTopology(topology);
+    for (Map<String, String> map : maps) {
+      configs.add(new MapConfig(map));
+    }
+    return configs;
+  }
+
+  /*
         *
         */
-       public void setSystemNameForStreams(Set<Stream> streams) {
-               Map<Integer, String> batchSizeMap = new HashMap<Integer, 
String>();
-               batchSizeMap.put(1, "kafka0"); // default system with sync 
producer
-               int counter = 0;
-               for (Stream stream:streams) {
-                       SamzaStream samzaStream = (SamzaStream) stream;
-                       String systemName = 
batchSizeMap.get(samzaStream.getBatchSize());
-                       if (systemName == null) {
-                               counter++;
-                               // Add new system
-                               systemName = "kafka"+Integer.toString(counter);
-                               batchSizeMap.put(samzaStream.getBatchSize(), 
systemName);
-                       }
-                       samzaStream.setSystemName(systemName);
-               }
-
-}
-
-       /*
-        * Generate a map with common properties for PIs and EPI
-        */
-       private Map<String,String> getBasicSystemConfig() {
-               Map<String,String> map = new HashMap<String,String>();
-               // Job & Task
-               if (this.isLocalMode) 
-                       map.put(JOB_FACTORY_CLASS_KEY, 
LocalJobFactory.class.getName());
-               else {
-                       map.put(JOB_FACTORY_CLASS_KEY, 
YarnJobFactory.class.getName());
-
-                       // yarn
-                       map.put(YARN_PACKAGE_KEY,jarPath);
-                       map.put(CONTAINER_MEMORY_KEY, 
Integer.toString(this.containerMemory));
-                       map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory));
-                       map.put(CONTAINER_COUNT_KEY, "1"); 
-                       map.put(YARN_CONF_HOME_KEY, 
SystemsUtils.getHadoopConfigHome());
-                       
-                       // Task opts (Heap size = 0.75 container memory) 
-                       int heapSize = (int)(0.75*this.containerMemory);
-                       map.put("task.opts", 
"-Xmx"+Integer.toString(heapSize)+"M -XX:+PrintGCDateStamps");
-               }
-
-
-               map.put(JOB_NAME_KEY, "");
-               map.put(TASK_CLASS_KEY, "");
-               map.put(TASK_INPUTS_KEY, "");
-
-               // register serializer
-               
map.put("serializers.registry.kryo.class",SamzaKryoSerdeFactory.class.getName());
-
-               // Serde registration
-               setKryoRegistration(map, this.kryoRegisterFile);
-
-               return map;
-       }
-       
-       /*
-        * Helper methods to set different properties in the input map
-        */
-       private static void setJobName(Map<String,String> map, String jobName) {
-               map.put(JOB_NAME_KEY, jobName);
-       }
-
-       private static void setFileName(Map<String,String> map, String 
filename) {
-               map.put(FILE_KEY, filename);
-       }
-
-       private static void setFileSystem(Map<String,String> map, String 
filesystem) {
-               map.put(FILESYSTEM_KEY, filesystem);
-       }
-
-       private static void setTaskClass(Map<String,String> map, String 
taskClass) {
-               map.put(TASK_CLASS_KEY, taskClass);
-       }
-
-       private static void setTaskInputs(Map<String,String> map, String 
inputs) {
-               map.put(TASK_INPUTS_KEY, inputs);
-       }
-
-       private static void setKryoRegistration(Map<String, String> map, String 
kryoRegisterFile) {
-               if (kryoRegisterFile != null) {
-                       String value = readKryoRegistration(kryoRegisterFile);
-                       map.put(SERDE_REGISTRATION_KEY,  value);
-               }
-       }
-       
-       private static void setNumberOfContainers(Map<String, String> map, int 
parallelism, int piPerContainer) {
-               int res = parallelism / piPerContainer;
-               if (parallelism % piPerContainer != 0) res++;
-               map.put(CONTAINER_COUNT_KEY, Integer.toString(res));
-       }
-       
-       private static void setKafkaSystem(Map<String,String> map, String 
systemName, String zk, String brokers, int batchSize) {
-               
map.put("systems."+systemName+".samza.factory",KafkaSystemFactory.class.getName());
-               map.put("systems."+systemName+".samza.msg.serde","kryo");
-
-               map.put("systems."+systemName+"."+ZOOKEEPER_URI_KEY,zk);
-               map.put("systems."+systemName+"."+BROKER_URI_KEY,brokers);
-               
map.put("systems."+systemName+"."+KAFKA_BATCHSIZE_KEY,Integer.toString(batchSize));
-
-               map.put("systems."+systemName+".samza.offset.default","oldest");
-
-               if (batchSize > 1) {
-                       
map.put("systems."+systemName+"."+KAFKA_PRODUCER_TYPE_KEY,"async");
-               }
-               else {
-                       
map.put("systems."+systemName+"."+KAFKA_PRODUCER_TYPE_KEY,"sync");
-               }
-       }
-       
-       // Set custom properties
-       private static void setValue(Map<String,String> map, String key, String 
value) {
-               map.put(key,value);
-       }
-
-       /*
-        * Helper method to parse Kryo registration file
-        */
-       private static String readKryoRegistration(String filePath) {
-               FileInputStream fis = null;
-               Properties props = new Properties();
-               StringBuilder result = new StringBuilder();
-               try {
-                       fis = new FileInputStream(filePath);
-                       props.load(fis);
-
-                       boolean first = true;
-                       String value = null;
-                       for(String k:props.stringPropertyNames()) {
-                               if (!first)
-                                       result.append(COMMA);
-                               else
-                                       first = false;
-                               
-                               // Need to avoid the dollar sign as samza pass 
all the properties in
-                               // the config to containers via commandline 
parameters/enviroment variables
-                               // We might escape the dollar sign, but it's 
more complicated than
-                               // replacing it with something else
-                               result.append(k.trim().replace(DOLLAR_SIGN, 
QUESTION_MARK));
-                               value = props.getProperty(k);
-                               if (value != null && value.trim().length() > 0) 
{
-                                       result.append(COLON);
-                                       
result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
-                               }
-                       }
-               } catch (FileNotFoundException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
-               } catch (IOException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
-               } finally {
-                       if (fis != null)
-                               try {
-                                       fis.close();
-                               } catch (IOException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
-                               }
-               }
-
-               return result.toString();
-       }
+  public void setSystemNameForStreams(Set<Stream> streams) {
+    Map<Integer, String> batchSizeMap = new HashMap<Integer, String>();
+    batchSizeMap.put(1, "kafka0"); // default system with sync producer
+    int counter = 0;
+    for (Stream stream : streams) {
+      SamzaStream samzaStream = (SamzaStream) stream;
+      String systemName = batchSizeMap.get(samzaStream.getBatchSize());
+      if (systemName == null) {
+        counter++;
+        // Add new system
+        systemName = "kafka" + Integer.toString(counter);
+        batchSizeMap.put(samzaStream.getBatchSize(), systemName);
+      }
+      samzaStream.setSystemName(systemName);
+    }
+
+  }
+
+  /*
+   * Generate a map with common properties for PIs and EPI
+   */
+  private Map<String, String> getBasicSystemConfig() {
+    Map<String, String> map = new HashMap<String, String>();
+    // Job & Task
+    if (this.isLocalMode)
+      map.put(JOB_FACTORY_CLASS_KEY, LocalJobFactory.class.getName());
+    else {
+      map.put(JOB_FACTORY_CLASS_KEY, YarnJobFactory.class.getName());
+
+      // yarn
+      map.put(YARN_PACKAGE_KEY, jarPath);
+      map.put(CONTAINER_MEMORY_KEY, Integer.toString(this.containerMemory));
+      map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory));
+      map.put(CONTAINER_COUNT_KEY, "1");
+      map.put(YARN_CONF_HOME_KEY, SystemsUtils.getHadoopConfigHome());
+
+      // Task opts (Heap size = 0.75 container memory)
+      int heapSize = (int) (0.75 * this.containerMemory);
+      map.put("task.opts", "-Xmx" + Integer.toString(heapSize) + "M 
-XX:+PrintGCDateStamps");
+    }
+
+    map.put(JOB_NAME_KEY, "");
+    map.put(TASK_CLASS_KEY, "");
+    map.put(TASK_INPUTS_KEY, "");
+
+    // register serializer
+    map.put("serializers.registry.kryo.class", 
SamzaKryoSerdeFactory.class.getName());
+
+    // Serde registration
+    setKryoRegistration(map, this.kryoRegisterFile);
+
+    return map;
+  }
+
+  /*
+   * Helper methods to set different properties in the input map
+   */
+  private static void setJobName(Map<String, String> map, String jobName) {
+    map.put(JOB_NAME_KEY, jobName);
+  }
+
+  private static void setFileName(Map<String, String> map, String filename) {
+    map.put(FILE_KEY, filename);
+  }
+
+  private static void setFileSystem(Map<String, String> map, String 
filesystem) {
+    map.put(FILESYSTEM_KEY, filesystem);
+  }
+
+  private static void setTaskClass(Map<String, String> map, String taskClass) {
+    map.put(TASK_CLASS_KEY, taskClass);
+  }
+
+  private static void setTaskInputs(Map<String, String> map, String inputs) {
+    map.put(TASK_INPUTS_KEY, inputs);
+  }
+
+  private static void setKryoRegistration(Map<String, String> map, String 
kryoRegisterFile) {
+    if (kryoRegisterFile != null) {
+      String value = readKryoRegistration(kryoRegisterFile);
+      map.put(SERDE_REGISTRATION_KEY, value);
+    }
+  }
+
+  private static void setNumberOfContainers(Map<String, String> map, int 
parallelism, int piPerContainer) {
+    int res = parallelism / piPerContainer;
+    if (parallelism % piPerContainer != 0)
+      res++;
+    map.put(CONTAINER_COUNT_KEY, Integer.toString(res));
+  }
+
+  private static void setKafkaSystem(Map<String, String> map, String 
systemName, String zk, String brokers,
+      int batchSize) {
+    map.put("systems." + systemName + ".samza.factory", 
KafkaSystemFactory.class.getName());
+    map.put("systems." + systemName + ".samza.msg.serde", "kryo");
+
+    map.put("systems." + systemName + "." + ZOOKEEPER_URI_KEY, zk);
+    map.put("systems." + systemName + "." + BROKER_URI_KEY, brokers);
+    map.put("systems." + systemName + "." + KAFKA_BATCHSIZE_KEY, 
Integer.toString(batchSize));
+
+    map.put("systems." + systemName + ".samza.offset.default", "oldest");
+
+    if (batchSize > 1) {
+      map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, 
"async");
+    }
+    else {
+      map.put("systems." + systemName + "." + KAFKA_PRODUCER_TYPE_KEY, "sync");
+    }
+  }
+
+  // Set custom properties
+  private static void setValue(Map<String, String> map, String key, String 
value) {
+    map.put(key, value);
+  }
+
+  /*
+   * Helper method to parse Kryo registration file
+   */
+  private static String readKryoRegistration(String filePath) {
+    FileInputStream fis = null;
+    Properties props = new Properties();
+    StringBuilder result = new StringBuilder();
+    try {
+      fis = new FileInputStream(filePath);
+      props.load(fis);
+
+      boolean first = true;
+      String value = null;
+      for (String k : props.stringPropertyNames()) {
+        if (!first)
+          result.append(COMMA);
+        else
+          first = false;
+
+        // Need to avoid the dollar sign as samza pass all the properties in
+        // the config to containers via commandline parameters/enviroment
+        // variables
+        // We might escape the dollar sign, but it's more complicated than
+        // replacing it with something else
+        result.append(k.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
+        value = props.getProperty(k);
+        if (value != null && value.trim().length() > 0) {
+          result.append(COLON);
+          result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
+        }
+      }
+    } catch (FileNotFoundException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } finally {
+      if (fis != null)
+        try {
+          fis.close();
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+    }
+
+    return result.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
index 8e9e446..cd4b846 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
@@ -34,122 +34,126 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 
 /**
- * Implementation of Samza's SerdeFactory
- * that uses Kryo to serialize/deserialize objects
+ * Implementation of Samza's SerdeFactory that uses Kryo to
+ * serialize/deserialize objects
  * 
  * @author Anh Thu Vu
  * @param <T>
- *
+ * 
  */
 public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> {
-       
-       private static final Logger logger = 
LoggerFactory.getLogger(SamzaKryoSerdeFactory.class);
-       
-       public static class SamzaKryoSerde<V> implements Serde<V> {
-               private Kryo kryo;
-               
-               public SamzaKryoSerde (String registrationInfo) {
-                       this.kryo = new Kryo();
-                       this.register(registrationInfo);
-               }
-               
-               @SuppressWarnings({ "rawtypes", "unchecked" })
-               private void register(String registrationInfo) {
-                       if (registrationInfo == null) return;
-                       
-                       String[] infoList = 
registrationInfo.split(SamzaConfigFactory.COMMA);
-                       
-                       Class targetClass = null;
-                       Class serializerClass = null;
-                       Serializer serializer = null;
-                       
-                       for (String info:infoList) {
-                               String[] fields = 
info.split(SamzaConfigFactory.COLON);
-                               
-                               targetClass = null;
-                               serializerClass = null;
-                               if (fields.length > 0) {
-                                       try {
-                                               targetClass = 
Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK, 
SamzaConfigFactory.DOLLAR_SIGN));
-                                       } catch (ClassNotFoundException e) {
-                                               // TODO Auto-generated catch 
block
-                                               e.printStackTrace();
-                                       }
-                               }
-                               if (fields.length > 1) {
-                                       try {
-                                               serializerClass = 
Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK, 
SamzaConfigFactory.DOLLAR_SIGN));
-                                       } catch (ClassNotFoundException e) {
-                                               // TODO Auto-generated catch 
block
-                                               e.printStackTrace();
-                                       }
-                               }
-                               
-                               if (targetClass != null) {
-                                       if (serializerClass == null) {
-                                               kryo.register(targetClass);
-                                       }
-                                       else {
-                                               serializer = 
resolveSerializerInstance(kryo, targetClass, (Class<? extends 
Serializer>)serializerClass) ;
-                                               kryo.register(targetClass, 
serializer);
-                                       }
-                               }
-                               else {
-                                       logger.info("Invalid registration 
info:{}",info);
-                               }
-                       }
-               }
-               
-               @SuppressWarnings("rawtypes")
-               private static Serializer resolveSerializerInstance(Kryo k, 
Class superClass, Class<? extends Serializer> serializerClass) {
-               try {
-                   try {
-                       return serializerClass.getConstructor(Kryo.class, 
Class.class).newInstance(k, superClass);
-                   } catch (Exception ex1) {
-                       try {
-                           return 
serializerClass.getConstructor(Kryo.class).newInstance(k);
-                       } catch (Exception ex2) {
-                           try {
-                               return 
serializerClass.getConstructor(Class.class).newInstance(superClass);
-                           } catch (Exception ex3) {
-                               return serializerClass.newInstance();
-                           }
-                       }
-                   }
-               } catch (Exception ex) {
-                   throw new IllegalArgumentException("Unable to create 
serializer \""
-                                                      + 
serializerClass.getName()
-                                                      + "\" for class: "
-                                                      + superClass.getName(), 
ex);
-               }
-           }
-               
-               /*
-                * Implement Samza Serde interface
-                */
-               @Override
-               public byte[] toBytes(V obj) {
-                       ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                       Output output = new Output(bos);
-                       kryo.writeClassAndObject(output, obj);
-                       output.flush();
-                       output.close();
-                       return bos.toByteArray();
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public V fromBytes(byte[] byteArr) {
-                       Input input = new Input(byteArr);
-                       Object obj = kryo.readClassAndObject(input);
-                       input.close();
-                       return (V)obj;
-               }
-               
-       }
-
-       @Override
-       public Serde<T> getSerde(String name, Config config) {
-               return new 
SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY));
-       }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SamzaKryoSerdeFactory.class);
+
+  public static class SamzaKryoSerde<V> implements Serde<V> {
+    private Kryo kryo;
+
+    public SamzaKryoSerde(String registrationInfo) {
+      this.kryo = new Kryo();
+      this.register(registrationInfo);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private void register(String registrationInfo) {
+      if (registrationInfo == null)
+        return;
+
+      String[] infoList = registrationInfo.split(SamzaConfigFactory.COMMA);
+
+      Class targetClass = null;
+      Class serializerClass = null;
+      Serializer serializer = null;
+
+      for (String info : infoList) {
+        String[] fields = info.split(SamzaConfigFactory.COLON);
+
+        targetClass = null;
+        serializerClass = null;
+        if (fields.length > 0) {
+          try {
+            targetClass = 
Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK,
+                SamzaConfigFactory.DOLLAR_SIGN));
+          } catch (ClassNotFoundException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+        }
+        if (fields.length > 1) {
+          try {
+            serializerClass = 
Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK,
+                SamzaConfigFactory.DOLLAR_SIGN));
+          } catch (ClassNotFoundException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+        }
+
+        if (targetClass != null) {
+          if (serializerClass == null) {
+            kryo.register(targetClass);
+          }
+          else {
+            serializer = resolveSerializerInstance(kryo, targetClass, (Class<? 
extends Serializer>) serializerClass);
+            kryo.register(targetClass, serializer);
+          }
+        }
+        else {
+          logger.info("Invalid registration info:{}", info);
+        }
+      }
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static Serializer resolveSerializerInstance(Kryo k, Class 
superClass,
+        Class<? extends Serializer> serializerClass) {
+      try {
+        try {
+          return serializerClass.getConstructor(Kryo.class, 
Class.class).newInstance(k, superClass);
+        } catch (Exception ex1) {
+          try {
+            return serializerClass.getConstructor(Kryo.class).newInstance(k);
+          } catch (Exception ex2) {
+            try {
+              return 
serializerClass.getConstructor(Class.class).newInstance(superClass);
+            } catch (Exception ex3) {
+              return serializerClass.newInstance();
+            }
+          }
+        }
+      } catch (Exception ex) {
+        throw new IllegalArgumentException("Unable to create serializer \""
+            + serializerClass.getName()
+            + "\" for class: "
+            + superClass.getName(), ex);
+      }
+    }
+
+    /*
+     * Implement Samza Serde interface
+     */
+    @Override
+    public byte[] toBytes(V obj) {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      Output output = new Output(bos);
+      kryo.writeClassAndObject(output, obj);
+      output.flush();
+      output.close();
+      return bos.toByteArray();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public V fromBytes(byte[] byteArr) {
+      Input input = new Input(byteArr);
+      Object obj = kryo.readClassAndObject(input);
+      input.close();
+      return (V) obj;
+    }
+
+  }
+
+  @Override
+  public Serde<T> getSerde(String name, Config config) {
+    return new 
SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
index 4bdbafd..62fa3fd 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
@@ -37,34 +37,34 @@ import com.esotericsoftware.kryo.io.Output;
  * @author Anh Thu Vu
  */
 public class SerializableSerializer extends Serializer<Object> {
-       @Override
-    public void write(Kryo kryo, Output output, Object object) {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        try {
-            ObjectOutputStream oos = new ObjectOutputStream(bos);
-            oos.writeObject(object);
-            oos.flush();
-        } catch(IOException e) {
-            throw new RuntimeException(e);
-        }
-        byte[] ser = bos.toByteArray();
-        output.writeInt(ser.length);
-        output.writeBytes(ser);
+  @Override
+  public void write(Kryo kryo, Output output, Object object) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos = new ObjectOutputStream(bos);
+      oos.writeObject(object);
+      oos.flush();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
-    
-    @SuppressWarnings("rawtypes")
-       @Override
-    public Object read(Kryo kryo, Input input, Class c) {
-        int len = input.readInt();
-        byte[] ser = new byte[len];
-        input.readBytes(ser);
-        ByteArrayInputStream bis = new ByteArrayInputStream(ser);
-        try {
-            ObjectInputStream ois = new ObjectInputStream(bis);
-            return ois.readObject();
-        } catch(Exception e) {
-            throw new RuntimeException(e);
-        }
+    byte[] ser = bos.toByteArray();
+    output.writeInt(ser.length);
+    output.writeBytes(ser);
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Object read(Kryo kryo, Input input, Class c) {
+    int len = input.readInt();
+    byte[] ser = new byte[len];
+    input.readBytes(ser);
+    ByteArrayInputStream bis = new ByteArrayInputStream(ser);
+    try {
+      ObjectInputStream ois = new ObjectInputStream(bis);
+      return ois.readObject();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
index 367f9f9..f8e6dcd 100644
--- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
+++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
@@ -45,339 +45,342 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Utilities methods for:
- * - Kafka
- * - HDFS
- * - Handling files on local FS
+ * Utilities methods for: - Kafka - HDFS - Handling files on local FS
  * 
  * @author Anh Thu Vu
  */
 public class SystemsUtils {
-       private static final Logger logger = 
LoggerFactory.getLogger(SystemsUtils.class);
-       
-       public static final String HDFS = "hdfs";
-       public static final String LOCAL_FS = "local";
-       
-       private static final String TEMP_FILE = "samoaTemp";
-       private static final String TEMP_FILE_SUFFIX = ".dat";
-       
-       /*
-        * Kafka
-        */
-       private static class KafkaUtils {
-               private static ZkClient zkClient;
-               
-               static void setZookeeper(String zk) {
-                       zkClient = new ZkClient(zk, 30000, 30000, new 
ZKStringSerializerWrapper());
-               }
-
-               /*
-                * Create Kafka topic/stream
-                */
-               static void createKafkaTopic(String name, int partitions, int 
replicas) {
-                       AdminUtils.createTopic(zkClient, name, partitions, 
replicas, new Properties());
-               }
-               
-               static class ZKStringSerializerWrapper implements ZkSerializer {
-                       @Override
-                       public Object deserialize(byte[] byteArray) throws 
ZkMarshallingError {
-                               return 
ZKStringSerializer.deserialize(byteArray);
-                       }
-
-                       @Override
-                       public byte[] serialize(Object obj) throws 
ZkMarshallingError {
-                               return ZKStringSerializer.serialize(obj);
-                       }
-               }
-       }
-       
-       /*
-        * HDFS
-        */
-       private static class HDFSUtils {
-               private static String coreConfPath;
-               private static String hdfsConfPath;
-               private static String configHomePath;
-               private static String samoaDir = null;
-               
-               static void setHadoopConfigHome(String hadoopConfPath) {
-                       logger.info("Hadoop config home:{}",hadoopConfPath);
-                       configHomePath = hadoopConfPath;
-                       java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml");
-                       java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml");
-                       coreConfPath = coreSitePath.toAbsolutePath().toString();
-                       hdfsConfPath = hdfsSitePath.toAbsolutePath().toString();
-               }
-               
-               static String getNameNodeUri() {
-                       Configuration config = new Configuration();
-                       config.addResource(new Path(coreConfPath));
-                       config.addResource(new Path(hdfsConfPath));
-                       
-                       return config.get("fs.defaultFS");
-               }
-               
-               static String getHadoopConfigHome() {
-                       return configHomePath;
-               }
-               
-               static void setSAMOADir(String dir) {
-                       if (dir != null)
-                               samoaDir = getNameNodeUri()+dir;
-                       else 
-                               samoaDir = null;
-               }
-               
-               static String getDefaultSAMOADir() throws IOException {
-                       Configuration config = new Configuration();
-                       config.addResource(new Path(coreConfPath));
-                       config.addResource(new Path(hdfsConfPath));
-                       
-                       FileSystem fs = FileSystem.get(config);
-                       Path defaultDir = new 
Path(fs.getHomeDirectory(),".samoa");
-                       return defaultDir.toString();
-               }
-               
-               static boolean deleteFileIfExist(String absPath) {
-                       Path p = new Path(absPath);
-                       return deleteFileIfExist(p);
-               }
-               
-               static boolean deleteFileIfExist(Path p) {
-                       Configuration config = new Configuration();
-                       config.addResource(new Path(coreConfPath));
-                       config.addResource(new Path(hdfsConfPath));
-                       
-                       FileSystem fs;
-                       try {
-                               fs = FileSystem.get(config);
-                               if (fs.exists(p)) {
-                                       return fs.delete(p, false);
-                               }
-                               else 
-                                       return true;
-                       } catch (IOException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       }
-                       return false;
-               }
-               
-               /*
-                * Write to HDFS
-                */
-               static String writeToHDFS(File file, String dstPath) {
-                       Configuration config = new Configuration();
-                       config.addResource(new Path(coreConfPath));
-                       config.addResource(new Path(hdfsConfPath));
-                       logger.info("Filesystem 
name:{}",config.get("fs.defaultFS"));
-                       
-                       // Default samoaDir
-                       if (samoaDir == null) {
-                               try {
-                                       samoaDir = getDefaultSAMOADir();
-                               }
-                               catch (IOException e) {
-                                       e.printStackTrace();
-                                       return null;
-                               }
-                       }
-                       
-                       // Setup src and dst paths
-                       //java.nio.file.Path tempPath = 
FileSystems.getDefault().getPath(samoaDir, dstPath);
-                       Path dst = new Path(samoaDir,dstPath);
-                       Path src = new Path(file.getAbsolutePath());
-                       
-                       // Delete file if already exists in HDFS
-                       if (deleteFileIfExist(dst) == false)
-                               return null;
-                       
-                       // Copy to HDFS
-                       FileSystem fs;
-                       try {
-                               fs = FileSystem.get(config);
-                               fs.copyFromLocalFile(src, dst);
-                       } catch (IOException e) {
-                               e.printStackTrace();
-                               return null;
-                       }
-                       
-                       return dst.toString(); // abs path to file
-               }
-               
-               /*
-                * Read from HDFS
-                */
-               static Object deserializeObjectFromFile(String filePath) {
-                       logger.info("Deserialize HDFS file:{}",filePath);
-                       Configuration config = new Configuration();
-                       config.addResource(new Path(coreConfPath));
-                       config.addResource(new Path(hdfsConfPath));
-                       
-                       Path file = new Path(filePath);
-                       FSDataInputStream dataInputStream = null;
-                       ObjectInputStream ois = null;
-                       Object obj = null;
-                       FileSystem fs;
-                       try {
-                               fs = FileSystem.get(config);
-                               dataInputStream = fs.open(file);
-                               ois = new ObjectInputStream(dataInputStream);
-                               obj = ois.readObject();
-                       } catch (IOException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       } catch (ClassNotFoundException e) {
-                               try {
-                                       if (dataInputStream != null) 
dataInputStream.close();
-                                       if (ois != null) ois.close();
-                               } catch (IOException ioException) {
-                                       // TODO auto-generated catch block
-                                       e.printStackTrace();
-                               }
-                       }
-                       
-                       return obj;
-               }
-               
-       }
-       
-       private static class LocalFileSystemUtils {
-               static boolean serializObjectToFile(Object obj, String fn) {
-                       FileOutputStream fos = null;
-                       ObjectOutputStream oos = null;
-                       try {
-                               fos = new FileOutputStream(fn);
-                               oos = new ObjectOutputStream(fos);
-                               oos.writeObject(obj);
-                       } catch (FileNotFoundException e) {
-                               e.printStackTrace();
-                               return false;
-                       } catch (IOException e) {
-                               e.printStackTrace();
-                               return false;
-                       } finally {
-                               try {
-                                       if (fos != null) fos.close();
-                                       if (oos != null) oos.close();
-                               } catch (IOException e) {
-                                       e.printStackTrace();
-                               }
-                       }
-
-                       return true;
-               }
-       
-               static Object deserializeObjectFromLocalFile(String filename) {
-                       logger.info("Deserialize local file:{}",filename);
-                       FileInputStream fis = null;
-                       ObjectInputStream ois = null;
-                       Object obj = null;
-                       try {
-                               fis = new FileInputStream(filename);
-                               ois = new ObjectInputStream(fis);
-                               obj = ois.readObject();
-                       } catch (IOException e) {
-                               // TODO auto-generated catch block
-                               e.printStackTrace();
-                       } catch (ClassNotFoundException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       } finally {
-                               try {
-                                       if (fis != null) fis.close();
-                                       if (ois != null) ois.close();
-                               } catch (IOException e) {
-                                       // TODO auto-generated catch block
-                                       e.printStackTrace();
-                               }
-                       }
-
-                       return obj;
-               }
-       }
-       
-       
-       
-       /*
-        * Create streams
-        */
-       public static void createKafkaTopic(String name, int partitions) {
-               createKafkaTopic(name, partitions, 1);
-       }
-       
-       public static void createKafkaTopic(String name, int partitions, int 
replicas) {
-               KafkaUtils.createKafkaTopic(name, partitions, replicas);
-       }
-       
-       /*
-        * Serialize object
-        */
-       public static boolean serializeObjectToLocalFileSystem(Object object, 
String path) {
-               return LocalFileSystemUtils.serializObjectToFile(object, path);
-       }
-       
-       public static String serializeObjectToHDFS(Object object, String path) {
-               File tmpDatFile;
-               try {
-                       tmpDatFile = File.createTempFile(TEMP_FILE, 
TEMP_FILE_SUFFIX);
-                       if (serializeObjectToLocalFileSystem(object, 
tmpDatFile.getAbsolutePath())) {
-                               return HDFSUtils.writeToHDFS(tmpDatFile, path);
-                       }
-               } catch (IOException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
-               }
-               return null;
-       }
-       
-       /*
-        * Deserialize object
-        */
-       @SuppressWarnings("unchecked")
-       public static Map<String,Object> deserializeMapFromFile(String 
filesystem, String filename) {
-               Map<String,Object> map;
-               if (filesystem.equals(HDFS)) {
-                       map = (Map<String,Object>) 
HDFSUtils.deserializeObjectFromFile(filename);
-               }
-               else {
-                       map = (Map<String,Object>) 
LocalFileSystemUtils.deserializeObjectFromLocalFile(filename);
-               }
-               return map;
-       }
-       
-       public static Object deserializeObjectFromFileAndKey(String filesystem, 
String filename, String key) {
-               Map<String,Object> map = deserializeMapFromFile(filesystem, 
filename);
-               if (map == null) return null;
-               return map.get(key);
-       }
-       
-       /*
-        * Setup
-        */
-       public static void setZookeeper(String zookeeper) {
-               KafkaUtils.setZookeeper(zookeeper);
-       }
-       
-       public static void setHadoopConfigHome(String hadoopHome) {
-               HDFSUtils.setHadoopConfigHome(hadoopHome);
-       }
-       
-       public static void setSAMOADir(String samoaDir) {
-               HDFSUtils.setSAMOADir(samoaDir);
-       }
-       
-       /*
-        * Others
-        */
-       public static String getHDFSNameNodeUri() {
-               return HDFSUtils.getNameNodeUri();
-       }
-       public static String getHadoopConfigHome() {
-               return HDFSUtils.getHadoopConfigHome();
-       }
-       
-       public static String copyToHDFS(File file, String dstPath) {
-               return HDFSUtils.writeToHDFS(file, dstPath);
-       }
+  private static final Logger logger = 
LoggerFactory.getLogger(SystemsUtils.class);
+
+  public static final String HDFS = "hdfs";
+  public static final String LOCAL_FS = "local";
+
+  private static final String TEMP_FILE = "samoaTemp";
+  private static final String TEMP_FILE_SUFFIX = ".dat";
+
+  /*
+   * Kafka
+   */
+  private static class KafkaUtils {
+    private static ZkClient zkClient;
+
+    static void setZookeeper(String zk) {
+      zkClient = new ZkClient(zk, 30000, 30000, new 
ZKStringSerializerWrapper());
+    }
+
+    /*
+     * Create Kafka topic/stream
+     */
+    static void createKafkaTopic(String name, int partitions, int replicas) {
+      AdminUtils.createTopic(zkClient, name, partitions, replicas, new 
Properties());
+    }
+
+    static class ZKStringSerializerWrapper implements ZkSerializer {
+      @Override
+      public Object deserialize(byte[] byteArray) throws ZkMarshallingError {
+        return ZKStringSerializer.deserialize(byteArray);
+      }
+
+      @Override
+      public byte[] serialize(Object obj) throws ZkMarshallingError {
+        return ZKStringSerializer.serialize(obj);
+      }
+    }
+  }
+
+  /*
+   * HDFS
+   */
+  private static class HDFSUtils {
+    private static String coreConfPath;
+    private static String hdfsConfPath;
+    private static String configHomePath;
+    private static String samoaDir = null;
+
+    static void setHadoopConfigHome(String hadoopConfPath) {
+      logger.info("Hadoop config home:{}", hadoopConfPath);
+      configHomePath = hadoopConfPath;
+      java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml");
+      java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml");
+      coreConfPath = coreSitePath.toAbsolutePath().toString();
+      hdfsConfPath = hdfsSitePath.toAbsolutePath().toString();
+    }
+
+    static String getNameNodeUri() {
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+
+      return config.get("fs.defaultFS");
+    }
+
+    static String getHadoopConfigHome() {
+      return configHomePath;
+    }
+
+    static void setSAMOADir(String dir) {
+      if (dir != null)
+        samoaDir = getNameNodeUri() + dir;
+      else
+        samoaDir = null;
+    }
+
+    static String getDefaultSAMOADir() throws IOException {
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+
+      FileSystem fs = FileSystem.get(config);
+      Path defaultDir = new Path(fs.getHomeDirectory(), ".samoa");
+      return defaultDir.toString();
+    }
+
+    static boolean deleteFileIfExist(String absPath) {
+      Path p = new Path(absPath);
+      return deleteFileIfExist(p);
+    }
+
+    static boolean deleteFileIfExist(Path p) {
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+
+      FileSystem fs;
+      try {
+        fs = FileSystem.get(config);
+        if (fs.exists(p)) {
+          return fs.delete(p, false);
+        }
+        else
+          return true;
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+      return false;
+    }
+
+    /*
+     * Write to HDFS
+     */
+    static String writeToHDFS(File file, String dstPath) {
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+      logger.info("Filesystem name:{}", config.get("fs.defaultFS"));
+
+      // Default samoaDir
+      if (samoaDir == null) {
+        try {
+          samoaDir = getDefaultSAMOADir();
+        } catch (IOException e) {
+          e.printStackTrace();
+          return null;
+        }
+      }
+
+      // Setup src and dst paths
+      // java.nio.file.Path tempPath =
+      // FileSystems.getDefault().getPath(samoaDir, dstPath);
+      Path dst = new Path(samoaDir, dstPath);
+      Path src = new Path(file.getAbsolutePath());
+
+      // Delete file if already exists in HDFS
+      if (deleteFileIfExist(dst) == false)
+        return null;
+
+      // Copy to HDFS
+      FileSystem fs;
+      try {
+        fs = FileSystem.get(config);
+        fs.copyFromLocalFile(src, dst);
+      } catch (IOException e) {
+        e.printStackTrace();
+        return null;
+      }
+
+      return dst.toString(); // abs path to file
+    }
+
+    /*
+     * Read from HDFS
+     */
+    static Object deserializeObjectFromFile(String filePath) {
+      logger.info("Deserialize HDFS file:{}", filePath);
+      Configuration config = new Configuration();
+      config.addResource(new Path(coreConfPath));
+      config.addResource(new Path(hdfsConfPath));
+
+      Path file = new Path(filePath);
+      FSDataInputStream dataInputStream = null;
+      ObjectInputStream ois = null;
+      Object obj = null;
+      FileSystem fs;
+      try {
+        fs = FileSystem.get(config);
+        dataInputStream = fs.open(file);
+        ois = new ObjectInputStream(dataInputStream);
+        obj = ois.readObject();
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      } catch (ClassNotFoundException e) {
+        try {
+          if (dataInputStream != null)
+            dataInputStream.close();
+          if (ois != null)
+            ois.close();
+        } catch (IOException ioException) {
+          // TODO auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+
+      return obj;
+    }
+
+  }
+
+  private static class LocalFileSystemUtils {
+    static boolean serializObjectToFile(Object obj, String fn) {
+      FileOutputStream fos = null;
+      ObjectOutputStream oos = null;
+      try {
+        fos = new FileOutputStream(fn);
+        oos = new ObjectOutputStream(fos);
+        oos.writeObject(obj);
+      } catch (FileNotFoundException e) {
+        e.printStackTrace();
+        return false;
+      } catch (IOException e) {
+        e.printStackTrace();
+        return false;
+      } finally {
+        try {
+          if (fos != null)
+            fos.close();
+          if (oos != null)
+            oos.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+
+      return true;
+    }
+
+    static Object deserializeObjectFromLocalFile(String filename) {
+      logger.info("Deserialize local file:{}", filename);
+      FileInputStream fis = null;
+      ObjectInputStream ois = null;
+      Object obj = null;
+      try {
+        fis = new FileInputStream(filename);
+        ois = new ObjectInputStream(fis);
+        obj = ois.readObject();
+      } catch (IOException e) {
+        // TODO auto-generated catch block
+        e.printStackTrace();
+      } catch (ClassNotFoundException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      } finally {
+        try {
+          if (fis != null)
+            fis.close();
+          if (ois != null)
+            ois.close();
+        } catch (IOException e) {
+          // TODO auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+
+      return obj;
+    }
+  }
+
+  /*
+   * Create streams
+   */
+  public static void createKafkaTopic(String name, int partitions) {
+    createKafkaTopic(name, partitions, 1);
+  }
+
+  public static void createKafkaTopic(String name, int partitions, int 
replicas) {
+    KafkaUtils.createKafkaTopic(name, partitions, replicas);
+  }
+
+  /*
+   * Serialize object
+   */
+  public static boolean serializeObjectToLocalFileSystem(Object object, String 
path) {
+    return LocalFileSystemUtils.serializObjectToFile(object, path);
+  }
+
+  public static String serializeObjectToHDFS(Object object, String path) {
+    File tmpDatFile;
+    try {
+      tmpDatFile = File.createTempFile(TEMP_FILE, TEMP_FILE_SUFFIX);
+      if (serializeObjectToLocalFileSystem(object, 
tmpDatFile.getAbsolutePath())) {
+        return HDFSUtils.writeToHDFS(tmpDatFile, path);
+      }
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  /*
+   * Deserialize object
+   */
+  @SuppressWarnings("unchecked")
+  public static Map<String, Object> deserializeMapFromFile(String filesystem, 
String filename) {
+    Map<String, Object> map;
+    if (filesystem.equals(HDFS)) {
+      map = (Map<String, Object>) 
HDFSUtils.deserializeObjectFromFile(filename);
+    }
+    else {
+      map = (Map<String, Object>) 
LocalFileSystemUtils.deserializeObjectFromLocalFile(filename);
+    }
+    return map;
+  }
+
+  public static Object deserializeObjectFromFileAndKey(String filesystem, 
String filename, String key) {
+    Map<String, Object> map = deserializeMapFromFile(filesystem, filename);
+    if (map == null)
+      return null;
+    return map.get(key);
+  }
+
+  /*
+   * Setup
+   */
+  public static void setZookeeper(String zookeeper) {
+    KafkaUtils.setZookeeper(zookeeper);
+  }
+
+  public static void setHadoopConfigHome(String hadoopHome) {
+    HDFSUtils.setHadoopConfigHome(hadoopHome);
+  }
+
+  public static void setSAMOADir(String samoaDir) {
+    HDFSUtils.setSAMOADir(samoaDir);
+  }
+
+  /*
+   * Others
+   */
+  public static String getHDFSNameNodeUri() {
+    return HDFSUtils.getNameNodeUri();
+  }
+
+  public static String getHadoopConfigHome() {
+    return HDFSUtils.getHadoopConfigHome();
+  }
+
+  public static String copyToHDFS(File file, String dstPath) {
+    return HDFSUtils.writeToHDFS(file, dstPath);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
index 54792ae..d6ea26e 100644
--- a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
+++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
@@ -34,45 +34,46 @@ import com.yahoo.labs.samoa.topology.impl.StormTopology;
 
 /**
  * The main class to execute a SAMOA task in LOCAL mode in Storm.
- *
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
 public class LocalStormDoTask {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(LocalStormDoTask.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalStormDoTask.class);
 
-    /**
-     * The main method.
-     *
-     * @param args the arguments
-     */
-    public static void main(String[] args) {
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
 
-        List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
 
-        int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
+    int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
 
-        args = tmpArgs.toArray(new String[0]);
+    args = tmpArgs.toArray(new String[0]);
 
-        //convert the arguments into Storm topology
-        StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
-        String topologyName = stormTopo.getTopologyName();
+    // convert the arguments into Storm topology
+    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+    String topologyName = stormTopo.getTopologyName();
 
-        Config conf = new Config();
-        //conf.putAll(Utils.readStormConfig());
-        conf.setDebug(false);
+    Config conf = new Config();
+    // conf.putAll(Utils.readStormConfig());
+    conf.setDebug(false);
 
-        //local mode
-        conf.setMaxTaskParallelism(numWorker);
+    // local mode
+    conf.setMaxTaskParallelism(numWorker);
 
-        backtype.storm.LocalCluster cluster = new 
backtype.storm.LocalCluster();
-        cluster.submitTopology(topologyName, conf, 
stormTopo.getStormBuilder().createTopology());
+    backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster();
+    cluster.submitTopology(topologyName, conf, 
stormTopo.getStormBuilder().createTopology());
 
-        backtype.storm.utils.Utils.sleep(600 * 1000);
+    backtype.storm.utils.Utils.sleep(600 * 1000);
 
-        cluster.killTopology(topologyName);
-        cluster.shutdown();
+    cluster.killTopology(topologyName);
+    cluster.shutdown();
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
index ad9794d..84a9336 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
@@ -26,40 +26,41 @@ import com.yahoo.labs.samoa.core.ContentEvent;
 
 /**
  * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
-class StormBoltStream extends StormStream{
-       
-       /**
+class StormBoltStream extends StormStream {
+
+  /**
         * 
         */
-       private static final long serialVersionUID = -5712513402991550847L;
-       
-       private OutputCollector outputCollector;
+  private static final long serialVersionUID = -5712513402991550847L;
+
+  private OutputCollector outputCollector;
+
+  StormBoltStream(String stormComponentId) {
+    super(stormComponentId);
+  }
 
-       StormBoltStream(String stormComponentId){
-               super(stormComponentId);
-       }
+  @Override
+  public void put(ContentEvent contentEvent) {
+    outputCollector.emit(this.outputStreamId, new Values(contentEvent, 
contentEvent.getKey()));
+  }
 
-       @Override
-       public void put(ContentEvent contentEvent) {
-               outputCollector.emit(this.outputStreamId, new 
Values(contentEvent, contentEvent.getKey()));
-       }
-       
-       public void setCollector(OutputCollector outputCollector){
-               this.outputCollector = outputCollector;
-       }
+  public void setCollector(OutputCollector outputCollector) {
+    this.outputCollector = outputCollector;
+  }
 
-//     @Override
-//     public void setStreamId(String streamId) {
-//             // TODO Auto-generated method stub
-//             //this.outputStreamId = streamId;
-//     }
+  // @Override
+  // public void setStreamId(String streamId) {
+  // // TODO Auto-generated method stub
+  // //this.outputStreamId = streamId;
+  // }
 
-       @Override
-       public String getStreamId() {
-               // TODO Auto-generated method stub
-               return null;
-       }
+  @Override
+  public String getStreamId() {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
index 347fd50..9a2bc65 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
@@ -37,54 +37,54 @@ import com.yahoo.labs.samoa.topology.Topology;
  */
 public final class StormComponentFactory implements ComponentFactory {
 
-    private final Map<String, Integer> processorList;
-
-    public StormComponentFactory() {
-        processorList = new HashMap<>();
-    }
-
-    @Override
-    public ProcessingItem createPi(Processor processor) {
-        return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), 1);
+  private final Map<String, Integer> processorList;
+
+  public StormComponentFactory() {
+    processorList = new HashMap<>();
+  }
+
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), 1);
+  }
+
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+    return new StormEntranceProcessingItem(processor, 
this.getComponentName(processor.getClass()));
+  }
+
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi;
+    return stormCompatiblePi.createStream();
+  }
+
+  @Override
+  public Topology createTopology(String topoName) {
+    return new StormTopology(topoName);
+  }
+
+  private String getComponentName(Class<? extends Processor> clazz) {
+    StringBuilder componentName = new StringBuilder(clazz.getCanonicalName());
+    String key = componentName.toString();
+    Integer index;
+
+    if (!processorList.containsKey(key)) {
+      index = 1;
+    } else {
+      index = processorList.get(key) + 1;
     }
 
-    @Override
-    public EntranceProcessingItem createEntrancePi(EntranceProcessor 
processor) {
-        return new StormEntranceProcessingItem(processor, 
this.getComponentName(processor.getClass()));
-    }
+    processorList.put(key, index);
 
-    @Override
-    public Stream createStream(IProcessingItem sourcePi) {
-        StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi;
-        return stormCompatiblePi.createStream();
-    }
-
-    @Override
-    public Topology createTopology(String topoName) {
-        return new StormTopology(topoName);
-    }
+    componentName.append('_');
+    componentName.append(index);
 
-    private String getComponentName(Class<? extends Processor> clazz) {
-        StringBuilder componentName = new 
StringBuilder(clazz.getCanonicalName());
-        String key = componentName.toString();
-        Integer index;
+    return componentName.toString();
+  }
 
-        if (!processorList.containsKey(key)) {
-            index = 1;
-        } else {
-            index = processorList.get(key) + 1;
-        }
-
-        processorList.put(key, index);
-
-        componentName.append('_');
-        componentName.append(index);
-
-        return componentName.toString();
-    }
-
-    @Override
-    public ProcessingItem createPi(Processor processor, int parallelism) {
-        return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), parallelism);
-    }
+  @Override
+  public ProcessingItem createPi(Processor processor, int parallelism) {
+    return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), parallelism);
+  }
 }

Reply via email to