http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
index 1351159..da5644d 100644
--- 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
+++ 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
@@ -35,154 +35,154 @@ import com.yahoo.labs.samoa.topology.ProcessingItem;
 import com.yahoo.labs.samoa.topology.Stream;
 
 /**
- * S4 Platform platform specific processing item, inherits from S4 
ProcessinElemnt.
+ * S4 Platform platform specific processing item, inherits from S4
+ * ProcessinElemnt.
  * 
  * @author severien
- *
+ * 
  */
 public class S4ProcessingItem extends ProcessingElement implements
-               ProcessingItem {
-
-       public static final Logger logger = LoggerFactory
-                       .getLogger(S4ProcessingItem.class);
-
-       private Processor processor;
-       private int paralellismLevel;
-       private S4DoTask app;
-
-       private static final String NAME="PROCESSING-ITEM-";
-       private static int OBJ_COUNTER=0;
-       
-       /**
-        * Constructor of S4 ProcessingItem.
-        * 
-        * @param app : S4 application
-        */
-       public S4ProcessingItem(App app) {
-               super(app);
-               super.setName(NAME+OBJ_COUNTER);
-               OBJ_COUNTER++;
-               this.app = (S4DoTask) app;
-               this.paralellismLevel = 1;
-       }
-
-       @Override
-       public String getName() {
-               return super.getName();
-       }
-       
-       /**
-        * Gets processing item paralellism level.
-        * 
-        * @return int
-        */
-       public int getParalellismLevel() {
-               return paralellismLevel;
-       }
-
-       /**
-        * Sets processing item paralellism level.
-        * 
-        * @param paralellismLevel
-        */
-       public void setParalellismLevel(int paralellismLevel) {
-               this.paralellismLevel = paralellismLevel;
-       }
-
-       /**
-        * onEvent method.
-        * 
-        * @param event
-        */
-       public void onEvent(S4Event event) {
-               if (processor.process(event.getContentEvent()) == true) {
-                       close();
-               }
-       }
-
-       /**
-        * Sets S4 processing item processor.
-        * 
-        * @param processor
-        */
-       public void setProcessor(Processor processor) {
-               this.processor = processor;
-       }
-
-       // Methods from ProcessingItem
-       @Override
-       public Processor getProcessor() {
-               return processor;
-       }
-
-       /**
-        * KeyFinder sets the keys for a specific event.
-        * 
-        * @return KeyFinder
-        */
-       private KeyFinder<S4Event> getKeyFinder() {
-               KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() {
-                       @Override
-                       public List<String> get(S4Event s4event) {
-                               List<String> results = new ArrayList<String>();
-                               results.add(s4event.getKey());
-                               return results;
-                       }
-               };
-
-               return keyFinder;
-       }
-       
-       
-       @Override
-       public ProcessingItem connectInputAllStream(Stream inputStream) {
-
-               S4Stream stream = (S4Stream) inputStream;
-               stream.setParallelism(this.paralellismLevel);
-               stream.addStream(inputStream.getStreamId(),
-                               getKeyFinder(), this, S4Stream.BROADCAST);
-               return this;
-       }
-
-       
-       @Override
-       public ProcessingItem connectInputKeyStream(Stream inputStream) {
-
-               S4Stream stream = (S4Stream) inputStream;
-               stream.setParallelism(this.paralellismLevel);
-               stream.addStream(inputStream.getStreamId(),
-                               getKeyFinder(), this,S4Stream.GROUP_BY_KEY);
-
-               return this;
-       }
-       
-       @Override
-       public ProcessingItem connectInputShuffleStream(Stream inputStream) {
-               S4Stream stream = (S4Stream) inputStream;
-               stream.setParallelism(this.paralellismLevel);
-               stream.addStream(inputStream.getStreamId(),
-                               getKeyFinder(), this,S4Stream.SHUFFLE);
-
-               return this;
-       }
-
-       // Methods from ProcessingElement
-       @Override
-       protected void onCreate() {
-               logger.debug("PE ID {}", getId());              
-                               if (this.processor != null) {
-                       this.processor = 
this.processor.newProcessor(this.processor);
-                       this.processor.onCreate(Integer.parseInt(getId()));
-               }
-       }
-
-       @Override
-       protected void onRemove() {
-               // do nothing
-       }
-
-       @Override
-       public int getParallelism() {
-               return this.paralellismLevel;
-       }
+    ProcessingItem {
+
+  public static final Logger logger = LoggerFactory
+      .getLogger(S4ProcessingItem.class);
+
+  private Processor processor;
+  private int paralellismLevel;
+  private S4DoTask app;
+
+  private static final String NAME = "PROCESSING-ITEM-";
+  private static int OBJ_COUNTER = 0;
+
+  /**
+   * Constructor of S4 ProcessingItem.
+   * 
+   * @param app
+   *          : S4 application
+   */
+  public S4ProcessingItem(App app) {
+    super(app);
+    super.setName(NAME + OBJ_COUNTER);
+    OBJ_COUNTER++;
+    this.app = (S4DoTask) app;
+    this.paralellismLevel = 1;
+  }
+
+  @Override
+  public String getName() {
+    return super.getName();
+  }
+
+  /**
+   * Gets processing item paralellism level.
+   * 
+   * @return int
+   */
+  public int getParalellismLevel() {
+    return paralellismLevel;
+  }
+
+  /**
+   * Sets processing item paralellism level.
+   * 
+   * @param paralellismLevel
+   */
+  public void setParalellismLevel(int paralellismLevel) {
+    this.paralellismLevel = paralellismLevel;
+  }
+
+  /**
+   * onEvent method.
+   * 
+   * @param event
+   */
+  public void onEvent(S4Event event) {
+    if (processor.process(event.getContentEvent()) == true) {
+      close();
+    }
+  }
+
+  /**
+   * Sets S4 processing item processor.
+   * 
+   * @param processor
+   */
+  public void setProcessor(Processor processor) {
+    this.processor = processor;
+  }
+
+  // Methods from ProcessingItem
+  @Override
+  public Processor getProcessor() {
+    return processor;
+  }
+
+  /**
+   * KeyFinder sets the keys for a specific event.
+   * 
+   * @return KeyFinder
+   */
+  private KeyFinder<S4Event> getKeyFinder() {
+    KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() {
+      @Override
+      public List<String> get(S4Event s4event) {
+        List<String> results = new ArrayList<String>();
+        results.add(s4event.getKey());
+        return results;
+      }
+    };
+
+    return keyFinder;
+  }
+
+  @Override
+  public ProcessingItem connectInputAllStream(Stream inputStream) {
+
+    S4Stream stream = (S4Stream) inputStream;
+    stream.setParallelism(this.paralellismLevel);
+    stream.addStream(inputStream.getStreamId(),
+        getKeyFinder(), this, S4Stream.BROADCAST);
+    return this;
+  }
+
+  @Override
+  public ProcessingItem connectInputKeyStream(Stream inputStream) {
+
+    S4Stream stream = (S4Stream) inputStream;
+    stream.setParallelism(this.paralellismLevel);
+    stream.addStream(inputStream.getStreamId(),
+        getKeyFinder(), this, S4Stream.GROUP_BY_KEY);
+
+    return this;
+  }
+
+  @Override
+  public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+    S4Stream stream = (S4Stream) inputStream;
+    stream.setParallelism(this.paralellismLevel);
+    stream.addStream(inputStream.getStreamId(),
+        getKeyFinder(), this, S4Stream.SHUFFLE);
+
+    return this;
+  }
+
+  // Methods from ProcessingElement
+  @Override
+  protected void onCreate() {
+    logger.debug("PE ID {}", getId());
+    if (this.processor != null) {
+      this.processor = this.processor.newProcessor(this.processor);
+      this.processor.onCreate(Integer.parseInt(getId()));
+    }
+  }
+
+  @Override
+  protected void onRemove() {
+    // do nothing
+  }
+
+  @Override
+  public int getParallelism() {
+    return this.paralellismLevel;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
index 78a3266..67a1385 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
@@ -35,151 +35,151 @@ import com.yahoo.labs.samoa.topology.AbstractStream;
  * S4 Platform specific stream.
  * 
  * @author severien
- *
+ * 
  */
 public class S4Stream extends AbstractStream {
 
-       public static final int SHUFFLE = 0;
-       public static final int GROUP_BY_KEY = 1;
-       public static final int BROADCAST = 2;
-
-       private static final Logger logger = 
LoggerFactory.getLogger(S4Stream.class);
-
-       private S4DoTask app;
-       private int processingItemParalellism;
-       private int shuffleCounter;
-
-       private static final String NAME = "STREAM-";
-       private static int OBJ_COUNTER = 0;
-       
-       /* The stream list */
-       public List<StreamType> streams;
-
-       public S4Stream(S4DoTask app) {
-               super();
-               this.app = app;
-               this.processingItemParalellism = 1;
-               this.shuffleCounter = 0;
-               this.streams = new ArrayList<StreamType>();
-               this.setStreamId(NAME+OBJ_COUNTER);
-               OBJ_COUNTER++;
-       }
-       
-       public S4Stream(S4DoTask app, S4ProcessingItem pi) {
-               super();
-               this.app = app;
-               this.processingItemParalellism = 1;
-               this.shuffleCounter = 0;
-               this.streams = new ArrayList<StreamType>();
-               this.setStreamId(NAME+OBJ_COUNTER);
-               OBJ_COUNTER++;
-               
-       }
-
-       /**
-        * 
-        * @return
-        */
-       public int getParallelism() {
-               return processingItemParalellism;
-       }
-
-       public void setParallelism(int parallelism) {
-               this.processingItemParalellism = parallelism;
-       }
-
-       public void addStream(String streamID, KeyFinder<S4Event> finder,
-                       S4ProcessingItem pi, int type) {
-               String streamName = streamID +"_"+pi.getName(); 
-               org.apache.s4.core.Stream<S4Event> stream = 
this.app.createStream(
-                               streamName, pi);
-               stream.setName(streamName);
-               logger.debug("Stream name S4Stream {}", streamName);
-               if (finder != null)
-                       stream.setKey(finder);
-               this.streams.add(new StreamType(stream, type));
-
-       }
-
-       @Override
-       public void put(ContentEvent event) {
-
-               for (int i = 0; i < streams.size(); i++) {
-
-                       switch (streams.get(i).getType()) {
-                       case SHUFFLE:
-                               S4Event s4event = new S4Event(event);
-                               
s4event.setStreamId(streams.get(i).getStream().getName());
-                               if(getParallelism() == 1) {
-                                       s4event.setKey("0");
-                               }else {
-                                       
s4event.setKey(Integer.toString(shuffleCounter));
-                               }
-                               streams.get(i).getStream().put(s4event);
-                               shuffleCounter++;
-                                if (shuffleCounter >= (getParallelism())) {
-                                       shuffleCounter = 0;
-                               }
-                               
-                               break;
-
-                       case GROUP_BY_KEY:
-                               S4Event s4event1 = new S4Event(event);
-                               
s4event1.setStreamId(streams.get(i).getStream().getName());
-                               HashCodeBuilder hb = new HashCodeBuilder();
-                               hb.append(event.getKey());
-                               String key = Integer.toString(hb.build() % 
getParallelism());
-                               s4event1.setKey(key);
-                               streams.get(i).getStream().put(s4event1);
-                               break;
-                               
-                       case BROADCAST:
-                               for (int p = 0; p < this.getParallelism(); p++) 
{
-                                       S4Event s4event2 = new S4Event(event);
-                                       
s4event2.setStreamId(streams.get(i).getStream().getName());
-                                       s4event2.setKey(Integer.toString(p));
-                                       
streams.get(i).getStream().put(s4event2);
-                               }
-                               break;
-
-                       default:
-                               break;
-                       }
-
-                       
-               }
-
-       }
-
-       /**
-        * Subclass for definig stream connection type
-        * @author severien
-        *
-        */
-       class StreamType {
-               org.apache.s4.core.Stream<S4Event> stream;
-               int type;
-
-               public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) {
-                       this.stream = s;
-                       this.type = t;
-               }
-
-               public org.apache.s4.core.Stream<S4Event> getStream() {
-                       return stream;
-               }
-
-               public void setStream(org.apache.s4.core.Stream<S4Event> 
stream) {
-                       this.stream = stream;
-               }
-
-               public int getType() {
-                       return type;
-               }
-
-               public void setType(int type) {
-                       this.type = type;
-               }
-
-       }
+  public static final int SHUFFLE = 0;
+  public static final int GROUP_BY_KEY = 1;
+  public static final int BROADCAST = 2;
+
+  private static final Logger logger = LoggerFactory.getLogger(S4Stream.class);
+
+  private S4DoTask app;
+  private int processingItemParalellism;
+  private int shuffleCounter;
+
+  private static final String NAME = "STREAM-";
+  private static int OBJ_COUNTER = 0;
+
+  /* The stream list */
+  public List<StreamType> streams;
+
+  public S4Stream(S4DoTask app) {
+    super();
+    this.app = app;
+    this.processingItemParalellism = 1;
+    this.shuffleCounter = 0;
+    this.streams = new ArrayList<StreamType>();
+    this.setStreamId(NAME + OBJ_COUNTER);
+    OBJ_COUNTER++;
+  }
+
+  public S4Stream(S4DoTask app, S4ProcessingItem pi) {
+    super();
+    this.app = app;
+    this.processingItemParalellism = 1;
+    this.shuffleCounter = 0;
+    this.streams = new ArrayList<StreamType>();
+    this.setStreamId(NAME + OBJ_COUNTER);
+    OBJ_COUNTER++;
+
+  }
+
+  /**
+   * 
+   * @return
+   */
+  public int getParallelism() {
+    return processingItemParalellism;
+  }
+
+  public void setParallelism(int parallelism) {
+    this.processingItemParalellism = parallelism;
+  }
+
+  public void addStream(String streamID, KeyFinder<S4Event> finder,
+      S4ProcessingItem pi, int type) {
+    String streamName = streamID + "_" + pi.getName();
+    org.apache.s4.core.Stream<S4Event> stream = this.app.createStream(
+        streamName, pi);
+    stream.setName(streamName);
+    logger.debug("Stream name S4Stream {}", streamName);
+    if (finder != null)
+      stream.setKey(finder);
+    this.streams.add(new StreamType(stream, type));
+
+  }
+
+  @Override
+  public void put(ContentEvent event) {
+
+    for (int i = 0; i < streams.size(); i++) {
+
+      switch (streams.get(i).getType()) {
+      case SHUFFLE:
+        S4Event s4event = new S4Event(event);
+        s4event.setStreamId(streams.get(i).getStream().getName());
+        if (getParallelism() == 1) {
+          s4event.setKey("0");
+        } else {
+          s4event.setKey(Integer.toString(shuffleCounter));
+        }
+        streams.get(i).getStream().put(s4event);
+        shuffleCounter++;
+        if (shuffleCounter >= (getParallelism())) {
+          shuffleCounter = 0;
+        }
+
+        break;
+
+      case GROUP_BY_KEY:
+        S4Event s4event1 = new S4Event(event);
+        s4event1.setStreamId(streams.get(i).getStream().getName());
+        HashCodeBuilder hb = new HashCodeBuilder();
+        hb.append(event.getKey());
+        String key = Integer.toString(hb.build() % getParallelism());
+        s4event1.setKey(key);
+        streams.get(i).getStream().put(s4event1);
+        break;
+
+      case BROADCAST:
+        for (int p = 0; p < this.getParallelism(); p++) {
+          S4Event s4event2 = new S4Event(event);
+          s4event2.setStreamId(streams.get(i).getStream().getName());
+          s4event2.setKey(Integer.toString(p));
+          streams.get(i).getStream().put(s4event2);
+        }
+        break;
+
+      default:
+        break;
+      }
+
+    }
+
+  }
+
+  /**
+   * Subclass for definig stream connection type
+   * 
+   * @author severien
+   * 
+   */
+  class StreamType {
+    org.apache.s4.core.Stream<S4Event> stream;
+    int type;
+
+    public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) {
+      this.stream = s;
+      this.type = t;
+    }
+
+    public org.apache.s4.core.Stream<S4Event> getStream() {
+      return stream;
+    }
+
+    public void setStream(org.apache.s4.core.Stream<S4Event> stream) {
+      this.stream = stream;
+    }
+
+    public int getType() {
+      return type;
+    }
+
+    public void setType(int type) {
+      this.type = type;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
index c7ef92c..cf5a9b3 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
@@ -44,103 +44,102 @@ import com.beust.jcommander.Parameters;
 
 public class S4Submitter implements ISubmitter {
 
-       private static Logger logger = 
LoggerFactory.getLogger(S4Submitter.class);
-
-       @Override
-       public void deployTask(Task task) {
-               // TODO: Get application FROM HTTP server
-               // TODO: Initializa a http server to serve the app package
-               
-               String appURIString = null;
-//             File app = new File(System.getProperty("user.dir")
-//                             + "/src/site/dist/SAMOA-S4-0.1-dist.jar");
-               
-               // TODO: String app url 
http://localhost:8000/SAMOA-S4-0.1-dist.jar
-               try {
-                       URL appURL = new 
URL("http://localhost:8000/SAMOA-S4-0.1.jar";);
-                       appURIString = appURL.toString();
-               } catch (MalformedURLException e1) {
-                       e1.printStackTrace();
-               }
-               
-//             try {
-//                     appURIString = app.toURI().toURL().toString();
-//             } catch (MalformedURLException e) {
-//                     e.printStackTrace();
-//             }
-               if (task == null) {
-                       logger.error("Can't execute since evaluation task is 
not set!");
-                       return;
-               } else {
-                       logger.info("Deploying SAMOA S4 task [{}] from location 
[{}]. ",
-                                       task.getClass().getSimpleName(), 
appURIString);
-               }
-
-               String[] args = { "-c=testCluster2",
-                               "-appClass=" + S4DoTask.class.getName(),
-                               "-appName=" + "samoaApp",
-                               "-p=evalTask=" + 
task.getClass().getSimpleName(),
-                               "-zk=localhost:2181", "-s4r=" + appURIString , 
"-emc=" + SamoaSerializerModule.class.getName()};
-               // "-emc=" + S4MOAModule.class.getName(),
-               // "@" +
-               // Resources.getResource("s4moa.properties").getFile(),
-
-               S4Config s4config = new S4Config();
-               JCommander jc = new JCommander(s4config);
-               jc.parse(args);
-
-               Map<String, String> namedParameters = new HashMap<String, 
String>();
-               for (String parameter : s4config.namedParameters) {
-                       String[] param = parameter.split("=");
-                       namedParameters.put(param[0], param[1]);
-               }
-
-               AppConfig config = new AppConfig.Builder()
-                               
.appClassName(s4config.appClass).appName(s4config.appName)
-                               
.appURI(s4config.appURI).namedParameters(namedParameters)
-                               .build();
-
-               DeploymentUtils.initAppConfig(config, s4config.clusterName, 
true,
-                               s4config.zkString);
-
-               System.out.println("Suposedly deployed on S4");
-       }
-
-       
-       public void initHTTPServer() {
-               
-       }
-       
-       @Parameters(separators = "=")
-       public static class S4Config {
-
-               @Parameter(names = { "-c", "-cluster" }, description = "Cluster 
name", required = true)
-               String clusterName = null;
-
-               @Parameter(names = "-appClass", description = "Main App class", 
required = false)
-               String appClass = null;
-
-               @Parameter(names = "-appName", description = "Application 
name", required = false)
-               String appName = null;
-
-               @Parameter(names = "-s4r", description = "Application URI", 
required = false)
-               String appURI = null;
-
-               @Parameter(names = "-zk", description = "ZooKeeper connection 
string", required = false)
-               String zkString = null;
-
-               @Parameter(names = { "-extraModulesClasses", "-emc" }, 
description = "Comma-separated list of additional configuration modules (they 
will be instantiated through their constructor without arguments).", required = 
false)
-               List<String> extraModules = new ArrayList<String>();
-
-               @Parameter(names = { "-p", "-namedStringParameters" }, 
description = "Comma-separated list of inline configuration "
-                               + "parameters, taking precedence over 
homonymous configuration parameters from configuration files. "
-                               + "Syntax: '-p=name1=value1,name2=value2 '", 
required = false, converter = ParsingUtils.InlineConfigParameterConverter.class)
-               List<String> namedParameters = new ArrayList<String>();
-
-       }
-
-       @Override
-       public void setLocal(boolean bool) {
-               // TODO S4 works the same for local and distributed environments
-       }
+  private static Logger logger = LoggerFactory.getLogger(S4Submitter.class);
+
+  @Override
+  public void deployTask(Task task) {
+    // TODO: Get application FROM HTTP server
+    // TODO: Initializa a http server to serve the app package
+
+    String appURIString = null;
+    // File app = new File(System.getProperty("user.dir")
+    // + "/src/site/dist/SAMOA-S4-0.1-dist.jar");
+
+    // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar
+    try {
+      URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar";);
+      appURIString = appURL.toString();
+    } catch (MalformedURLException e1) {
+      e1.printStackTrace();
+    }
+
+    // try {
+    // appURIString = app.toURI().toURL().toString();
+    // } catch (MalformedURLException e) {
+    // e.printStackTrace();
+    // }
+    if (task == null) {
+      logger.error("Can't execute since evaluation task is not set!");
+      return;
+    } else {
+      logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ",
+          task.getClass().getSimpleName(), appURIString);
+    }
+
+    String[] args = { "-c=testCluster2",
+        "-appClass=" + S4DoTask.class.getName(),
+        "-appName=" + "samoaApp",
+        "-p=evalTask=" + task.getClass().getSimpleName(),
+        "-zk=localhost:2181", "-s4r=" + appURIString, "-emc=" + 
SamoaSerializerModule.class.getName() };
+    // "-emc=" + S4MOAModule.class.getName(),
+    // "@" +
+    // Resources.getResource("s4moa.properties").getFile(),
+
+    S4Config s4config = new S4Config();
+    JCommander jc = new JCommander(s4config);
+    jc.parse(args);
+
+    Map<String, String> namedParameters = new HashMap<String, String>();
+    for (String parameter : s4config.namedParameters) {
+      String[] param = parameter.split("=");
+      namedParameters.put(param[0], param[1]);
+    }
+
+    AppConfig config = new AppConfig.Builder()
+        .appClassName(s4config.appClass).appName(s4config.appName)
+        .appURI(s4config.appURI).namedParameters(namedParameters)
+        .build();
+
+    DeploymentUtils.initAppConfig(config, s4config.clusterName, true,
+        s4config.zkString);
+
+    System.out.println("Suposedly deployed on S4");
+  }
+
+  public void initHTTPServer() {
+
+  }
+
+  @Parameters(separators = "=")
+  public static class S4Config {
+
+    @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", 
required = true)
+    String clusterName = null;
+
+    @Parameter(names = "-appClass", description = "Main App class", required = 
false)
+    String appClass = null;
+
+    @Parameter(names = "-appName", description = "Application name", required 
= false)
+    String appName = null;
+
+    @Parameter(names = "-s4r", description = "Application URI", required = 
false)
+    String appURI = null;
+
+    @Parameter(names = "-zk", description = "ZooKeeper connection string", 
required = false)
+    String zkString = null;
+
+    @Parameter(names = { "-extraModulesClasses", "-emc" }, description = 
"Comma-separated list of additional configuration modules (they will be 
instantiated through their constructor without arguments).", required = false)
+    List<String> extraModules = new ArrayList<String>();
+
+    @Parameter(names = { "-p", "-namedStringParameters" }, description = 
"Comma-separated list of inline configuration "
+        + "parameters, taking precedence over homonymous configuration 
parameters from configuration files. "
+        + "Syntax: '-p=name1=value1,name2=value2 '", required = false, 
converter = ParsingUtils.InlineConfigParameterConverter.class)
+    List<String> namedParameters = new ArrayList<String>();
+
+  }
+
+  @Override
+  public void setLocal(boolean bool) {
+    // TODO S4 works the same for local and distributed environments
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
index 6bef0e8..2f7661d 100644
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
@@ -24,38 +24,40 @@ import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
 import com.yahoo.labs.samoa.topology.AbstractTopology;
 
 public class S4Topology extends AbstractTopology {
-       
-       // CASEY: it seems evaluationTask is not used. 
-       // Remove it for now
-    
-//     private String _evaluationTask;
-
-//    S4Topology(String topoName, String evalTask) {
-//        super(topoName);
-//    }
-//
-//    S4Topology(String topoName) {
-//        this(topoName, null);
-//    }
-
-//    @Override
-//    public void setEvaluationTask(String evalTask) {
-//        _evaluationTask = evalTask;
-//    }
-//
-//    @Override
-//    public String getEvaluationTask() {
-//        return _evaluationTask;
-//    }
-    
-       S4Topology(String topoName) {
-               super(topoName);
-       }
-       
-    protected EntranceProcessingItem getEntranceProcessingItem() {
-       if (this.getEntranceProcessingItems() == null) return null;
-       if (this.getEntranceProcessingItems().size() < 1) return null;
-       // TODO: support multiple entrance PIs
-       return 
(EntranceProcessingItem)this.getEntranceProcessingItems().toArray()[0];
-    }
+
+  // CASEY: it seems evaluationTask is not used.
+  // Remove it for now
+
+  // private String _evaluationTask;
+
+  // S4Topology(String topoName, String evalTask) {
+  // super(topoName);
+  // }
+  //
+  // S4Topology(String topoName) {
+  // this(topoName, null);
+  // }
+
+  // @Override
+  // public void setEvaluationTask(String evalTask) {
+  // _evaluationTask = evalTask;
+  // }
+  //
+  // @Override
+  // public String getEvaluationTask() {
+  // return _evaluationTask;
+  // }
+
+  S4Topology(String topoName) {
+    super(topoName);
+  }
+
+  protected EntranceProcessingItem getEntranceProcessingItem() {
+    if (this.getEntranceProcessingItems() == null)
+      return null;
+    if (this.getEntranceProcessingItems().size() < 1)
+      return null;
+    // TODO: support multiple entrance PIs
+    return (EntranceProcessingItem) 
this.getEntranceProcessingItems().toArray()[0];
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
index 4ae2296..61648e6 100644
--- 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
+++ 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
@@ -32,68 +32,69 @@ import com.google.inject.assistedinject.Assisted;
 import com.yahoo.labs.samoa.learners.classifiers.trees.AttributeContentEvent;
 import com.yahoo.labs.samoa.learners.classifiers.trees.ComputeContentEvent;
 
-public class SamoaSerializer implements SerializerDeserializer{
+public class SamoaSerializer implements SerializerDeserializer {
 
-       private ThreadLocal<Kryo> kryoThreadLocal;
-    private ThreadLocal<Output> outputThreadLocal;
+  private ThreadLocal<Kryo> kryoThreadLocal;
+  private ThreadLocal<Output> outputThreadLocal;
 
-    private int initialBufferSize = 2048;
-    private int maxBufferSize = 256 * 1024;
+  private int initialBufferSize = 2048;
+  private int maxBufferSize = 256 * 1024;
 
-    public void setMaxBufferSize(int maxBufferSize) {
-        this.maxBufferSize = maxBufferSize;
-    }
+  public void setMaxBufferSize(int maxBufferSize) {
+    this.maxBufferSize = maxBufferSize;
+  }
 
-    /**
-     * 
-     * @param classLoader
-     *            classloader able to handle classes to serialize/deserialize. 
For instance, application-level events
-     *            can only be handled by the application classloader.
-     */
-    @Inject
-    public SamoaSerializer(@Assisted final ClassLoader classLoader) {
-        kryoThreadLocal = new ThreadLocal<Kryo>() {
+  /**
+   * 
+   * @param classLoader
+   *          classloader able to handle classes to serialize/deserialize. For
+   *          instance, application-level events can only be handled by the
+   *          application classloader.
+   */
+  @Inject
+  public SamoaSerializer(@Assisted final ClassLoader classLoader) {
+    kryoThreadLocal = new ThreadLocal<Kryo>() {
 
-            @Override
-            protected Kryo initialValue() {
-                Kryo kryo = new Kryo();
-                kryo.setClassLoader(classLoader);
-                kryo.register(AttributeContentEvent.class, new 
AttributeContentEvent.AttributeCEFullPrecSerializer());
-                kryo.register(ComputeContentEvent.class, new 
ComputeContentEvent.ComputeCEFullPrecSerializer());
-                kryo.setRegistrationRequired(false);
-                return kryo;
-            }
-        };
+      @Override
+      protected Kryo initialValue() {
+        Kryo kryo = new Kryo();
+        kryo.setClassLoader(classLoader);
+        kryo.register(AttributeContentEvent.class, new 
AttributeContentEvent.AttributeCEFullPrecSerializer());
+        kryo.register(ComputeContentEvent.class, new 
ComputeContentEvent.ComputeCEFullPrecSerializer());
+        kryo.setRegistrationRequired(false);
+        return kryo;
+      }
+    };
 
-        outputThreadLocal = new ThreadLocal<Output>() {
-            @Override
-            protected Output initialValue() {
-                Output output = new Output(initialBufferSize, maxBufferSize);
-                return output;
-            }
-        };
+    outputThreadLocal = new ThreadLocal<Output>() {
+      @Override
+      protected Output initialValue() {
+        Output output = new Output(initialBufferSize, maxBufferSize);
+        return output;
+      }
+    };
 
-    }
+  }
 
-    @Override
-    public Object deserialize(ByteBuffer rawMessage) {
-        Input input = new Input(rawMessage.array());
-        try {
-            return kryoThreadLocal.get().readClassAndObject(input);
-        } finally {
-            input.close();
-        }
+  @Override
+  public Object deserialize(ByteBuffer rawMessage) {
+    Input input = new Input(rawMessage.array());
+    try {
+      return kryoThreadLocal.get().readClassAndObject(input);
+    } finally {
+      input.close();
     }
+  }
 
-    @SuppressWarnings("resource")
-    @Override
-    public ByteBuffer serialize(Object message) {
-        Output output = outputThreadLocal.get();
-        try {
-            kryoThreadLocal.get().writeClassAndObject(output, message);
-            return ByteBuffer.wrap(output.toBytes());
-        } finally {
-            output.clear();
-        }
+  @SuppressWarnings("resource")
+  @Override
+  public ByteBuffer serialize(Object message) {
+    Output output = outputThreadLocal.get();
+    try {
+      kryoThreadLocal.get().writeClassAndObject(output, message);
+      return ByteBuffer.wrap(output.toBytes());
+    } finally {
+      output.clear();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
index 311e449..a367eb5 100644
--- 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
+++ 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
@@ -26,10 +26,10 @@ import com.google.inject.AbstractModule;
 
 public class SamoaSerializerModule extends AbstractModule {
 
-       @Override
-       protected void configure() {
-               bind(SerializerDeserializer.class).to(SamoaSerializer.class);
-               
-       }
+  @Override
+  protected void configure() {
+    bind(SerializerDeserializer.class).to(SamoaSerializer.class);
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
index 45dd901..6c6103c 100644
--- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
+++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
@@ -44,184 +44,183 @@ import com.yahoo.labs.samoa.utils.SystemsUtils;
  */
 public class SamzaDoTask {
 
-       private static final Logger logger = 
LoggerFactory.getLogger(SamzaDoTask.class);
-       
-       private static final String LOCAL_MODE = "local";
-       private static final String YARN_MODE = "yarn";
-       
-       // FLAGS
-       private static final String YARN_CONF_FLAG = "--yarn_home";
-       private static final String MODE_FLAG = "--mode";
-       private static final String ZK_FLAG = "--zookeeper";
-       private static final String KAFKA_FLAG = "--kafka";
-       private static final String KAFKA_REPLICATION_FLAG = 
"--kafka_replication_factor";
-       private static final String CHECKPOINT_FREQ_FLAG = 
"--checkpoint_frequency";
-       private static final String JAR_PACKAGE_FLAG = "--jar_package";
-       private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir";
-       private static final String AM_MEMORY_FLAG = "--yarn_am_mem";
-       private static final String CONTAINER_MEMORY_FLAG = 
"--yarn_container_mem";
-       private static final String PI_PER_CONTAINER_FLAG = 
"--pi_per_container";
-       
-       private static final String KRYO_REGISTER_FLAG = "--kryo_register";
-       
-       // config values
-       private static int kafkaReplicationFactor = 1;
-       private static int checkpointFrequency = 60000;
-       private static String kafka = "localhost:9092";
-       private static String zookeeper = "localhost:2181";
-       private static boolean isLocal = true;
-       private static String yarnConfHome = null;
-       private static String samoaHDFSDir = null;
-       private static String jarPackagePath = null;
-       private static int amMem = 1024;
-       private static int containerMem = 1024;
-       private static int piPerContainer = 2;
-       private static String kryoRegisterFile = null;
-       
-       /*
-        * 1. Read arguments
-        * 2. Construct topology/task
-        * 3. Upload the JAR to HDFS if we are running on YARN
-        * 4. Submit topology to SamzaEngine
-        */
-       public static void main(String[] args) {
-               // Read arguments
-               List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
-               parseArguments(tmpArgs);
-               
-               args = tmpArgs.toArray(new String[0]);
-               
-               // Init Task
-               StringBuilder cliString = new StringBuilder();
-        for (int i = 0; i < args.length; i++) {
-            cliString.append(" ").append(args[i]);
+  private static final Logger logger = 
LoggerFactory.getLogger(SamzaDoTask.class);
+
+  private static final String LOCAL_MODE = "local";
+  private static final String YARN_MODE = "yarn";
+
+  // FLAGS
+  private static final String YARN_CONF_FLAG = "--yarn_home";
+  private static final String MODE_FLAG = "--mode";
+  private static final String ZK_FLAG = "--zookeeper";
+  private static final String KAFKA_FLAG = "--kafka";
+  private static final String KAFKA_REPLICATION_FLAG = 
"--kafka_replication_factor";
+  private static final String CHECKPOINT_FREQ_FLAG = "--checkpoint_frequency";
+  private static final String JAR_PACKAGE_FLAG = "--jar_package";
+  private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir";
+  private static final String AM_MEMORY_FLAG = "--yarn_am_mem";
+  private static final String CONTAINER_MEMORY_FLAG = "--yarn_container_mem";
+  private static final String PI_PER_CONTAINER_FLAG = "--pi_per_container";
+
+  private static final String KRYO_REGISTER_FLAG = "--kryo_register";
+
+  // config values
+  private static int kafkaReplicationFactor = 1;
+  private static int checkpointFrequency = 60000;
+  private static String kafka = "localhost:9092";
+  private static String zookeeper = "localhost:2181";
+  private static boolean isLocal = true;
+  private static String yarnConfHome = null;
+  private static String samoaHDFSDir = null;
+  private static String jarPackagePath = null;
+  private static int amMem = 1024;
+  private static int containerMem = 1024;
+  private static int piPerContainer = 2;
+  private static String kryoRegisterFile = null;
+
+  /*
+   * 1. Read arguments 2. Construct topology/task 3. Upload the JAR to HDFS if
+   * we are running on YARN 4. Submit topology to SamzaEngine
+   */
+  public static void main(String[] args) {
+    // Read arguments
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+    parseArguments(tmpArgs);
+
+    args = tmpArgs.toArray(new String[0]);
+
+    // Init Task
+    StringBuilder cliString = new StringBuilder();
+    for (int i = 0; i < args.length; i++) {
+      cliString.append(" ").append(args[i]);
+    }
+    logger.debug("Command line string = {}", cliString.toString());
+    System.out.println("Command line string = " + cliString.toString());
+
+    Task task = null;
+    try {
+      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
+      logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
+    } catch (Exception e) {
+      logger.error("Fail to initialize the task", e);
+      System.out.println("Fail to initialize the task" + e);
+      return;
+    }
+    task.setFactory(new SamzaComponentFactory());
+    task.init();
+
+    // Upload JAR file to HDFS
+    String hdfsPath = null;
+    if (!isLocal) {
+      Path path = FileSystems.getDefault().getPath(jarPackagePath);
+      hdfsPath = uploadJarToHDFS(path.toFile());
+      if (hdfsPath == null) {
+        System.out.println("Fail uploading JAR file \"" + 
path.toAbsolutePath().toString() + "\" to HDFS.");
+        return;
+      }
+    }
+
+    // Set parameters
+    SamzaEngine.getEngine()
+        .setLocalMode(isLocal)
+        .setZooKeeper(zookeeper)
+        .setKafka(kafka)
+        .setYarnPackage(hdfsPath)
+        .setKafkaReplicationFactor(kafkaReplicationFactor)
+        .setConfigHome(yarnConfHome)
+        .setAMMemory(amMem)
+        .setContainerMemory(containerMem)
+        .setPiPerContainerRatio(piPerContainer)
+        .setKryoRegisterFile(kryoRegisterFile)
+        .setCheckpointFrequency(checkpointFrequency);
+
+    // Submit topology
+    SamzaEngine.submitTopology((SamzaTopology) task.getTopology());
+
+  }
+
+  private static boolean isLocalMode(String mode) {
+    return mode.equals(LOCAL_MODE);
+  }
+
+  private static void parseArguments(List<String> args) {
+    for (int i = args.size() - 1; i >= 0; i--) {
+      String arg = args.get(i).trim();
+      String[] splitted = arg.split("=", 2);
+
+      if (splitted.length >= 2) {
+        // YARN config folder which contains conf/core-site.xml,
+        // conf/hdfs-site.xml, conf/yarn-site.xml
+        if (splitted[0].equals(YARN_CONF_FLAG)) {
+          yarnConfHome = splitted[1];
+          args.remove(i);
+        }
+        // host:port for zookeeper cluster
+        else if (splitted[0].equals(ZK_FLAG)) {
+          zookeeper = splitted[1];
+          args.remove(i);
+        }
+        // host:port,... for kafka broker(s)
+        else if (splitted[0].equals(KAFKA_FLAG)) {
+          kafka = splitted[1];
+          args.remove(i);
+        }
+        // whether we are running Samza in Local mode or YARN mode
+        else if (splitted[0].equals(MODE_FLAG)) {
+          isLocal = isLocalMode(splitted[1]);
+          args.remove(i);
+        }
+        // memory requirement for YARN application master
+        else if (splitted[0].equals(AM_MEMORY_FLAG)) {
+          amMem = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // memory requirement for YARN worker container
+        else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) {
+          containerMem = Integer.parseInt(splitted[1]);
+          args.remove(i);
         }
-        logger.debug("Command line string = {}", cliString.toString());
-        System.out.println("Command line string = " + cliString.toString());
-        
-               Task task = null;
-        try {
-            task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
-            logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
-        } catch (Exception e) {
-            logger.error("Fail to initialize the task", e);
-            System.out.println("Fail to initialize the task" + e);
-            return;
+        // the path to JAR archive that we need to upload to HDFS
+        else if (splitted[0].equals(JAR_PACKAGE_FLAG)) {
+          jarPackagePath = splitted[1];
+          args.remove(i);
         }
-               task.setFactory(new SamzaComponentFactory());
-               task.init();
-               
-               // Upload JAR file to HDFS
-               String hdfsPath = null;
-               if (!isLocal) {
-                       Path path = 
FileSystems.getDefault().getPath(jarPackagePath);
-                       hdfsPath = uploadJarToHDFS(path.toFile());
-                       if(hdfsPath == null) {
-                               System.out.println("Fail uploading JAR file 
\""+path.toAbsolutePath().toString()+"\" to HDFS.");
-                               return;
-                       }
-               }
-               
-               // Set parameters
-               SamzaEngine.getEngine()
-               .setLocalMode(isLocal)
-               .setZooKeeper(zookeeper)
-               .setKafka(kafka)
-               .setYarnPackage(hdfsPath)
-               .setKafkaReplicationFactor(kafkaReplicationFactor)
-               .setConfigHome(yarnConfHome)
-               .setAMMemory(amMem)
-               .setContainerMemory(containerMem)
-               .setPiPerContainerRatio(piPerContainer)
-               .setKryoRegisterFile(kryoRegisterFile)
-               .setCheckpointFrequency(checkpointFrequency);
-               
-               // Submit topology
-               SamzaEngine.submitTopology((SamzaTopology)task.getTopology());
-               
-       }
-       
-       private static boolean isLocalMode(String mode) {
-               return mode.equals(LOCAL_MODE);
-       }
-       
-       private static void parseArguments(List<String> args) {
-               for (int i=args.size()-1; i>=0; i--) {
-                       String arg = args.get(i).trim();
-                       String[] splitted = arg.split("=",2);
-                       
-                       if (splitted.length >= 2) {
-                               // YARN config folder which contains 
conf/core-site.xml,
-                               // conf/hdfs-site.xml, conf/yarn-site.xml
-                               if (splitted[0].equals(YARN_CONF_FLAG)) {
-                                       yarnConfHome = splitted[1];
-                                       args.remove(i);
-                               }
-                               // host:port for zookeeper cluster
-                               else if (splitted[0].equals(ZK_FLAG)) {
-                                       zookeeper = splitted[1];
-                                       args.remove(i);
-                               }
-                               // host:port,... for kafka broker(s)
-                               else if (splitted[0].equals(KAFKA_FLAG)) {
-                                       kafka = splitted[1];
-                                       args.remove(i);
-                               }
-                               // whether we are running Samza in Local mode 
or YARN mode 
-                               else if (splitted[0].equals(MODE_FLAG)) {
-                                       isLocal = isLocalMode(splitted[1]);
-                                       args.remove(i);
-                               }
-                               // memory requirement for YARN application 
master
-                               else if (splitted[0].equals(AM_MEMORY_FLAG)) {
-                                       amMem = Integer.parseInt(splitted[1]);
-                                       args.remove(i);
-                               }
-                               // memory requirement for YARN worker container
-                               else if 
(splitted[0].equals(CONTAINER_MEMORY_FLAG)) {
-                                       containerMem = 
Integer.parseInt(splitted[1]);
-                                       args.remove(i);
-                               }
-                               // the path to JAR archive that we need to 
upload to HDFS
-                               else if (splitted[0].equals(JAR_PACKAGE_FLAG)) {
-                                       jarPackagePath = splitted[1];
-                                       args.remove(i);
-                               }
-                               // the HDFS dir for SAMOA files
-                               else if 
(splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) {
-                                       samoaHDFSDir = splitted[1];
-                                       if (samoaHDFSDir.length() < 1) 
samoaHDFSDir = null;
-                                       args.remove(i);
-                               }
-                               // number of max PI instances per container
-                               // this will be used to compute the number of 
containers 
-                               // AM will request for the job
-                               else if 
(splitted[0].equals(PI_PER_CONTAINER_FLAG)) {
-                                       piPerContainer = 
Integer.parseInt(splitted[1]);
-                                       args.remove(i);
-                               }
-                               // kafka streams replication factor
-                               else if 
(splitted[0].equals(KAFKA_REPLICATION_FLAG)) {
-                                       kafkaReplicationFactor = 
Integer.parseInt(splitted[1]);
-                                       args.remove(i);
-                               }
-                               // checkpoint frequency in ms
-                               else if 
(splitted[0].equals(CHECKPOINT_FREQ_FLAG)) {
-                                       checkpointFrequency = 
Integer.parseInt(splitted[1]);
-                                       args.remove(i);
-                               }
-                               // the file contains registration information 
for Kryo serializer
-                               else if 
(splitted[0].equals(KRYO_REGISTER_FLAG)) {
-                                       kryoRegisterFile = splitted[1];
-                                       args.remove(i);
-                               }
-                       }
-               }
-       }
-       
-       private static String uploadJarToHDFS(File file) {
-               SystemsUtils.setHadoopConfigHome(yarnConfHome);
-               SystemsUtils.setSAMOADir(samoaHDFSDir);
-               return SystemsUtils.copyToHDFS(file, file.getName());
-       }
+        // the HDFS dir for SAMOA files
+        else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) {
+          samoaHDFSDir = splitted[1];
+          if (samoaHDFSDir.length() < 1)
+            samoaHDFSDir = null;
+          args.remove(i);
+        }
+        // number of max PI instances per container
+        // this will be used to compute the number of containers
+        // AM will request for the job
+        else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) {
+          piPerContainer = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // kafka streams replication factor
+        else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) {
+          kafkaReplicationFactor = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // checkpoint frequency in ms
+        else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) {
+          checkpointFrequency = Integer.parseInt(splitted[1]);
+          args.remove(i);
+        }
+        // the file contains registration information for Kryo serializer
+        else if (splitted[0].equals(KRYO_REGISTER_FLAG)) {
+          kryoRegisterFile = splitted[1];
+          args.remove(i);
+        }
+      }
+    }
+  }
+
+  private static String uploadJarToHDFS(File file) {
+    SystemsUtils.setHadoopConfigHome(yarnConfHome);
+    SystemsUtils.setSAMOADir(samoaHDFSDir);
+    return SystemsUtils.copyToHDFS(file, file.getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
index 362e0a5..1a4b57f 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
@@ -32,26 +32,25 @@ import 
org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
 import 
com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem.SamoaSystemConsumer;
 
 /**
- * Implementation of Samza's SystemFactory
- * Samza will use this factory to get our custom consumer
- * which gets the events from SAMOA EntranceProcessor
- * and feed them to EntranceProcessingItem task
+ * Implementation of Samza's SystemFactory Samza will use this factory to get
+ * our custom consumer which gets the events from SAMOA EntranceProcessor and
+ * feed them to EntranceProcessingItem task
  * 
  * @author Anh Thu Vu
  */
 public class SamoaSystemFactory implements SystemFactory {
-       @Override
-       public SystemAdmin getAdmin(String systemName, Config config) {
-               return new SinglePartitionWithoutOffsetsSystemAdmin();
-       }
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return new SinglePartitionWithoutOffsetsSystemAdmin();
+  }
 
-       @Override
-       public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
-               return new SamoaSystemConsumer(systemName, config);
-       }
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
+    return new SamoaSystemConsumer(systemName, config);
+  }
 
-       @Override
-       public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
-               throw new SamzaException("This implementation is not supposed 
to produce anything.");
-       }
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
+    throw new SamzaException("This implementation is not supposed to produce 
anything.");
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
index d71d97b..278b1f2 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
@@ -35,28 +35,28 @@ import com.yahoo.labs.samoa.topology.Topology;
  * @author Anh Thu Vu
  */
 public class SamzaComponentFactory implements ComponentFactory {
-       @Override
-       public ProcessingItem createPi(Processor processor) {
-               return this.createPi(processor, 1);
-       }
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
 
-       @Override
-       public ProcessingItem createPi(Processor processor, int parallelism) {
-               return new SamzaProcessingItem(processor, parallelism);
-       }
+  @Override
+  public ProcessingItem createPi(Processor processor, int parallelism) {
+    return new SamzaProcessingItem(processor, parallelism);
+  }
 
-       @Override
-       public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
-               return new SamzaEntranceProcessingItem(entranceProcessor);
-       }
-       
-       @Override
-       public Stream createStream(IProcessingItem sourcePi) {
-               return new SamzaStream(sourcePi);
-       }
-       
-       @Override
-       public Topology createTopology(String topoName) {
-               return new SamzaTopology(topoName);
-       }
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+    return new SamzaEntranceProcessingItem(entranceProcessor);
+  }
+
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    return new SamzaStream(sourcePi);
+  }
+
+  @Override
+  public Topology createTopology(String topoName) {
+    return new SamzaTopology(topoName);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
index 7339443..e3141f8 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
@@ -35,163 +35,162 @@ import com.yahoo.labs.samoa.utils.SamzaConfigFactory;
 import com.yahoo.labs.samoa.utils.SystemsUtils;
 
 /**
- * This class will submit a list of Samza jobs with 
- * the Configs generated from the input topology
+ * This class will submit a list of Samza jobs with the Configs generated from
+ * the input topology
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class SamzaEngine {
-       
-       private static final Logger logger = 
LoggerFactory.getLogger(SamzaEngine.class);
-       
-       /*
-        * Singleton instance
-        */
-       private static SamzaEngine engine = new SamzaEngine();
-       
-       private String zookeeper;
-       private String kafka;
-       private int kafkaReplicationFactor;
-       private boolean isLocalMode;
-       private String yarnPackagePath;
-       private String yarnConfHome;
-       
-       private String kryoRegisterFile;
-       
-       private int amMem;
-       private int containerMem;
-       private int piPerContainerRatio;
-       
-       private int checkpointFrequency;
-       
-       private void _submitTopology(SamzaTopology topology) {
-               
-               // Setup SamzaConfigFactory
-               SamzaConfigFactory configFactory = new SamzaConfigFactory();
-               configFactory.setLocalMode(isLocalMode)
-               .setZookeeper(zookeeper)
-               .setKafka(kafka)
-               .setYarnPackage(yarnPackagePath)
-               .setAMMemory(amMem)
-               .setContainerMemory(containerMem)
-               .setPiPerContainerRatio(piPerContainerRatio)
-               .setKryoRegisterFile(kryoRegisterFile)
-               .setCheckpointFrequency(checkpointFrequency)
-               .setReplicationFactor(kafkaReplicationFactor);
-               
-               // Generate the list of Configs
-               List<MapConfig> configs;
-               try {
-                       // ConfigFactory generate a list of configs
-                       // Serialize a map of PIs and store in a file in the 
jar at jarFilePath
-                       // (in dat/ folder)
-                       configs = 
configFactory.getMapConfigsForTopology(topology);
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       return;
-               }
-               
-               // Create kafka streams
-               Set<Stream> streams = topology.getStreams(); 
-               for (Stream stream:streams) {
-                       SamzaStream samzaStream = (SamzaStream) stream;
-                       List<SamzaSystemStream> systemStreams = 
samzaStream.getSystemStreams();
-                       for (SamzaSystemStream systemStream:systemStreams) {
-                               // all streams should be kafka streams
-                               
SystemsUtils.createKafkaTopic(systemStream.getStream(),systemStream.getParallelism(),kafkaReplicationFactor);
-                       }
-               }
-               
-               // Submit the jobs with those configs
-               for (MapConfig config:configs) {
-                       logger.info("Config:{}",config);
-                       JobRunner jobRunner = new JobRunner(config);
-                       jobRunner.run();
-               }
-       }
-
-       private void _setupSystemsUtils() {
-               // Setup Utils
-               if (!isLocalMode)
-                       SystemsUtils.setHadoopConfigHome(yarnConfHome);
-               SystemsUtils.setZookeeper(zookeeper);
-       }
-       
-       /*
-        * Setter methods
-        */
-       public static SamzaEngine getEngine() {
-               return engine;
-       }
-       
-       public SamzaEngine setZooKeeper(String zk) {
-               this.zookeeper = zk;
-               return this;
-       }
-       
-       public SamzaEngine setKafka(String kafka) {
-               this.kafka = kafka;
-               return this;
-       }
-       
-       public SamzaEngine setKafkaReplicationFactor(int replicationFactor) {
-               this.kafkaReplicationFactor = replicationFactor;
-               return this;
-       }
-       
-       public SamzaEngine setCheckpointFrequency(int freq) {
-               this.checkpointFrequency = freq;
-               return this;
-       }
-       
-       public SamzaEngine setLocalMode(boolean isLocal) {
-               this.isLocalMode = isLocal;
-               return this;
-       }
-       
-       public SamzaEngine setYarnPackage(String yarnPackagePath) {
-               this.yarnPackagePath = yarnPackagePath;
-               return this;
-       }
-       
-       public SamzaEngine setConfigHome(String configHome) {
-               this.yarnConfHome = configHome;
-               return this;
-       }
-       
-       public SamzaEngine setAMMemory(int mem) {
-               this.amMem = mem;
-               return this;
-       }
-       
-       public SamzaEngine setContainerMemory(int mem) {
-               this.containerMem = mem;
-               return this;
-       }
-       
-       public SamzaEngine setPiPerContainerRatio(int piPerContainer) {
-               this.piPerContainerRatio = piPerContainer;
-               return this;
-       }
-       
-       public SamzaEngine setKryoRegisterFile(String registerFile) {
-               this.kryoRegisterFile = registerFile;
-               return this;
-       }
-       
-       /**
-        * Submit a list of Samza jobs correspond to the submitted 
-        * topology
-        * 
-        * @param topo
-        *            the submitted topology
-        */
-       public static void submitTopology(SamzaTopology topo) {
-               // Setup SystemsUtils
-               engine._setupSystemsUtils();
-               
-               // Submit topology
-               engine._submitTopology(topo);
-       }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SamzaEngine.class);
+
+  /*
+   * Singleton instance
+   */
+  private static SamzaEngine engine = new SamzaEngine();
+
+  private String zookeeper;
+  private String kafka;
+  private int kafkaReplicationFactor;
+  private boolean isLocalMode;
+  private String yarnPackagePath;
+  private String yarnConfHome;
+
+  private String kryoRegisterFile;
+
+  private int amMem;
+  private int containerMem;
+  private int piPerContainerRatio;
+
+  private int checkpointFrequency;
+
+  private void _submitTopology(SamzaTopology topology) {
+
+    // Setup SamzaConfigFactory
+    SamzaConfigFactory configFactory = new SamzaConfigFactory();
+    configFactory.setLocalMode(isLocalMode)
+        .setZookeeper(zookeeper)
+        .setKafka(kafka)
+        .setYarnPackage(yarnPackagePath)
+        .setAMMemory(amMem)
+        .setContainerMemory(containerMem)
+        .setPiPerContainerRatio(piPerContainerRatio)
+        .setKryoRegisterFile(kryoRegisterFile)
+        .setCheckpointFrequency(checkpointFrequency)
+        .setReplicationFactor(kafkaReplicationFactor);
+
+    // Generate the list of Configs
+    List<MapConfig> configs;
+    try {
+      // ConfigFactory generate a list of configs
+      // Serialize a map of PIs and store in a file in the jar at jarFilePath
+      // (in dat/ folder)
+      configs = configFactory.getMapConfigsForTopology(topology);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
+
+    // Create kafka streams
+    Set<Stream> streams = topology.getStreams();
+    for (Stream stream : streams) {
+      SamzaStream samzaStream = (SamzaStream) stream;
+      List<SamzaSystemStream> systemStreams = samzaStream.getSystemStreams();
+      for (SamzaSystemStream systemStream : systemStreams) {
+        // all streams should be kafka streams
+        SystemsUtils.createKafkaTopic(systemStream.getStream(), 
systemStream.getParallelism(), kafkaReplicationFactor);
+      }
+    }
+
+    // Submit the jobs with those configs
+    for (MapConfig config : configs) {
+      logger.info("Config:{}", config);
+      JobRunner jobRunner = new JobRunner(config);
+      jobRunner.run();
+    }
+  }
+
+  private void _setupSystemsUtils() {
+    // Setup Utils
+    if (!isLocalMode)
+      SystemsUtils.setHadoopConfigHome(yarnConfHome);
+    SystemsUtils.setZookeeper(zookeeper);
+  }
+
+  /*
+   * Setter methods
+   */
+  public static SamzaEngine getEngine() {
+    return engine;
+  }
+
+  public SamzaEngine setZooKeeper(String zk) {
+    this.zookeeper = zk;
+    return this;
+  }
+
+  public SamzaEngine setKafka(String kafka) {
+    this.kafka = kafka;
+    return this;
+  }
+
+  public SamzaEngine setKafkaReplicationFactor(int replicationFactor) {
+    this.kafkaReplicationFactor = replicationFactor;
+    return this;
+  }
+
+  public SamzaEngine setCheckpointFrequency(int freq) {
+    this.checkpointFrequency = freq;
+    return this;
+  }
+
+  public SamzaEngine setLocalMode(boolean isLocal) {
+    this.isLocalMode = isLocal;
+    return this;
+  }
+
+  public SamzaEngine setYarnPackage(String yarnPackagePath) {
+    this.yarnPackagePath = yarnPackagePath;
+    return this;
+  }
+
+  public SamzaEngine setConfigHome(String configHome) {
+    this.yarnConfHome = configHome;
+    return this;
+  }
+
+  public SamzaEngine setAMMemory(int mem) {
+    this.amMem = mem;
+    return this;
+  }
+
+  public SamzaEngine setContainerMemory(int mem) {
+    this.containerMem = mem;
+    return this;
+  }
+
+  public SamzaEngine setPiPerContainerRatio(int piPerContainer) {
+    this.piPerContainerRatio = piPerContainer;
+    return this;
+  }
+
+  public SamzaEngine setKryoRegisterFile(String registerFile) {
+    this.kryoRegisterFile = registerFile;
+    return this;
+  }
+
+  /**
+   * Submit a list of Samza jobs correspond to the submitted topology
+   * 
+   * @param topo
+   *          the submitted topology
+   */
+  public static void submitTopology(SamzaTopology topo) {
+    // Setup SystemsUtils
+    engine._setupSystemsUtils();
+
+    // Submit topology
+    engine._submitTopology(topo);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
index e89d789..6eea7cb 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
@@ -44,179 +44,196 @@ import com.yahoo.labs.samoa.utils.SamzaConfigFactory;
 import com.yahoo.labs.samoa.utils.SystemsUtils;
 
 /**
- * EntranceProcessingItem for Samza
- * which is also a Samza task (StreamTask & InitableTask)
+ * EntranceProcessingItem for Samza which is also a Samza task (StreamTask &
+ * InitableTask)
  * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class SamzaEntranceProcessingItem extends AbstractEntranceProcessingItem
-                                         implements SamzaProcessingNode, 
Serializable, StreamTask, InitableTask {
+    implements SamzaProcessingNode, Serializable, StreamTask, InitableTask {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 7157734520046135039L;
-       
-       /*
-        * Constructors
-        */
-       public SamzaEntranceProcessingItem(EntranceProcessor processor) {
-               super(processor);
-       }
-       
-       // Need this so Samza can initialize a StreamTask
-       public SamzaEntranceProcessingItem() {} 
-       
-       /*
-        * Simple setters, getters
-        */
-       @Override
-       public int addOutputStream(SamzaStream stream) {
-               this.setOutputStream(stream);
-               return 1; // entrance PI should have only 1 output stream
-       }
-       
-       /*
-        * Serialization
-        */
-       private Object writeReplace() {
-               return new SerializationProxy(this);
-       }
-       
-       private static class SerializationProxy implements Serializable {
-               /**
+  private static final long serialVersionUID = 7157734520046135039L;
+
+  /*
+   * Constructors
+   */
+  public SamzaEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  // Need this so Samza can initialize a StreamTask
+  public SamzaEntranceProcessingItem() {
+  }
+
+  /*
+   * Simple setters, getters
+   */
+  @Override
+  public int addOutputStream(SamzaStream stream) {
+    this.setOutputStream(stream);
+    return 1; // entrance PI should have only 1 output stream
+  }
+
+  /*
+   * Serialization
+   */
+  private Object writeReplace() {
+    return new SerializationProxy(this);
+  }
+
+  private static class SerializationProxy implements Serializable {
+    /**
                 * 
                 */
-               private static final long serialVersionUID = 
313907132721414634L;
-               
-               private EntranceProcessor processor;
-               private SamzaStream outputStream;
-               private String name;
-               
-               public SerializationProxy(SamzaEntranceProcessingItem epi) {
-                       this.processor = epi.getProcessor();
-                       this.outputStream = (SamzaStream)epi.getOutputStream();
-                       this.name = epi.getName();
-               }
-       }
-       
-       /*
-        * Implement Samza Task
-        */
-       @Override
-       public void init(Config config, TaskContext context) throws Exception {
-               String yarnConfHome = 
config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
-               if (yarnConfHome != null && yarnConfHome.length() > 0) // if 
the property is set , otherwise, assume we are running in
-                                                                               
                        // local mode and ignore this
-                       SystemsUtils.setHadoopConfigHome(yarnConfHome);
-               
-               String filename = config.get(SamzaConfigFactory.FILE_KEY);
-               String filesystem = 
config.get(SamzaConfigFactory.FILESYSTEM_KEY);
-               
-               this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
-               SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, 
this.getName());
-               this.setOutputStream(wrapper.outputStream);
-               SamzaStream output = (SamzaStream)this.getOutputStream();
-               if (output != null) // if output stream exists, set it up
-                       output.onCreate();
-       }
-
-       @Override
-       public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) throws Exception {
-               SamzaStream output = (SamzaStream)this.getOutputStream();
-               if (output == null) return; // if there is no output stream, do 
nothing
-               output.setCollector(collector);
-               ContentEvent event = (ContentEvent) envelope.getMessage();
-               output.put(event);
-       }
-       
-       /*
-        * Implementation of Samza's SystemConsumer to get events from source
-        * and feed to SAMOA system
-        * 
-        */
-       /* Current implementation: buffer the incoming events and send a batch 
-        * of them when poll() is called by Samza system.
-        * 
-        * Currently: it has a "soft" limit on the size of the buffer:
-        * when the buffer size reaches the limit, the reading thread will sleep
-        * for 100ms.
-        * A hard limit can be achieved by overriding the method
-        * protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue()
-        * of BlockingEnvelopeMap
-        * But then we have handle the case when the queue is full.
-        * 
-        */
-       public static class SamoaSystemConsumer extends BlockingEnvelopeMap {
-               
-               private EntranceProcessor entranceProcessor = null;
-               private SystemStreamPartition systemStreamPartition;
-               
-               private static final Logger logger = 
LoggerFactory.getLogger(SamoaSystemConsumer.class);
-
-               public SamoaSystemConsumer(String systemName, Config config) {
-                       String yarnConfHome = 
config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
-                       if (yarnConfHome != null && yarnConfHome.length() > 0) 
// if the property is set , otherwise, assume we are running in
-                                                                           // 
local mode and ignore this
-                               SystemsUtils.setHadoopConfigHome(yarnConfHome);
-                       
-                       String filename = 
config.get(SamzaConfigFactory.FILE_KEY);
-                       String filesystem = 
config.get(SamzaConfigFactory.FILESYSTEM_KEY);
-                       String name = 
config.get(SamzaConfigFactory.JOB_NAME_KEY);
-                       SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, name);
-                       
-                       this.entranceProcessor = wrapper.processor;
-                       this.entranceProcessor.onCreate(0);
-                       
-                       // Internal stream from SystemConsumer to EntranceTask, 
so we
-                       // need only one partition
-                       this.systemStreamPartition = new 
SystemStreamPartition(systemName, wrapper.name, new Partition(0));
-               }
-               
-               @Override
-               public void start() {
-                       Thread processorPollingThread = new Thread(
-                       new Runnable() {
-                           @Override
-                           public void run() {
-                               try {
-                                   pollingEntranceProcessor();
-                                   setIsAtHead(systemStreamPartition, true);
-                               } catch (InterruptedException e) {
-                                   e.getStackTrace();
-                                   stop();
-                               }
-                           }
-                       }
-               );
-
-               processorPollingThread.start();
-               }
-
-               @Override
-               public void stop() {
-
-               }
-               
-               private void pollingEntranceProcessor() throws 
InterruptedException {
-                       int messageCnt = 0;
-                       while(!this.entranceProcessor.isFinished()) {
-                               messageCnt = 
this.getNumMessagesInQueue(systemStreamPartition);
-                               if (this.entranceProcessor.hasNext() && 
messageCnt < 10000) { // soft limit on the size of the queue
-                                       this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition,null, 
null,this.entranceProcessor.nextEvent()));
-                               } else {
-                                       try {
-                                               Thread.sleep(100);
-                                       } catch (InterruptedException e) {
-                                               break;
-                                       }
-                               }
-                       }
-                       
-                       // Send last event
-                       this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition,null, 
null,this.entranceProcessor.nextEvent()));
-               }
-               
-       }
+    private static final long serialVersionUID = 313907132721414634L;
+
+    private EntranceProcessor processor;
+    private SamzaStream outputStream;
+    private String name;
+
+    public SerializationProxy(SamzaEntranceProcessingItem epi) {
+      this.processor = epi.getProcessor();
+      this.outputStream = (SamzaStream) epi.getOutputStream();
+      this.name = epi.getName();
+    }
+  }
+
+  /*
+   * Implement Samza Task
+   */
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+    if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property 
is
+                                                           // set , otherwise,
+                                                           // assume we are
+                                                           // running in
+      // local mode and ignore this
+      SystemsUtils.setHadoopConfigHome(yarnConfHome);
+
+    String filename = config.get(SamzaConfigFactory.FILE_KEY);
+    String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+
+    this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
+    SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
+        filename, this.getName());
+    this.setOutputStream(wrapper.outputStream);
+    SamzaStream output = (SamzaStream) this.getOutputStream();
+    if (output != null) // if output stream exists, set it up
+      output.onCreate();
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
+      throws Exception {
+    SamzaStream output = (SamzaStream) this.getOutputStream();
+    if (output == null)
+      return; // if there is no output stream, do nothing
+    output.setCollector(collector);
+    ContentEvent event = (ContentEvent) envelope.getMessage();
+    output.put(event);
+  }
+
+  /*
+   * Implementation of Samza's SystemConsumer to get events from source and 
feed
+   * to SAMOA system
+   */
+  /*
+   * Current implementation: buffer the incoming events and send a batch of 
them
+   * when poll() is called by Samza system.
+   * 
+   * Currently: it has a "soft" limit on the size of the buffer: when the 
buffer
+   * size reaches the limit, the reading thread will sleep for 100ms. A hard
+   * limit can be achieved by overriding the method protected
+   * BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() of
+   * BlockingEnvelopeMap But then we have handle the case when the queue is
+   * full.
+   */
+  public static class SamoaSystemConsumer extends BlockingEnvelopeMap {
+
+    private EntranceProcessor entranceProcessor = null;
+    private SystemStreamPartition systemStreamPartition;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SamoaSystemConsumer.class);
+
+    public SamoaSystemConsumer(String systemName, Config config) {
+      String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+      if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property
+                                                             // is set ,
+                                                             // otherwise,
+                                                             // assume we are
+                                                             // running in
+        // local mode and ignore this
+        SystemsUtils.setHadoopConfigHome(yarnConfHome);
+
+      String filename = config.get(SamzaConfigFactory.FILE_KEY);
+      String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+      String name = config.get(SamzaConfigFactory.JOB_NAME_KEY);
+      SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
+          filename, name);
+
+      this.entranceProcessor = wrapper.processor;
+      this.entranceProcessor.onCreate(0);
+
+      // Internal stream from SystemConsumer to EntranceTask, so we
+      // need only one partition
+      this.systemStreamPartition = new SystemStreamPartition(systemName, 
wrapper.name, new Partition(0));
+    }
+
+    @Override
+    public void start() {
+      Thread processorPollingThread = new Thread(
+          new Runnable() {
+            @Override
+            public void run() {
+              try {
+                pollingEntranceProcessor();
+                setIsAtHead(systemStreamPartition, true);
+              } catch (InterruptedException e) {
+                e.getStackTrace();
+                stop();
+              }
+            }
+          }
+          );
+
+      processorPollingThread.start();
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    private void pollingEntranceProcessor() throws InterruptedException {
+      int messageCnt = 0;
+      while (!this.entranceProcessor.isFinished()) {
+        messageCnt = this.getNumMessagesInQueue(systemStreamPartition);
+        if (this.entranceProcessor.hasNext() && messageCnt < 10000) { // soft
+                                                                      // limit
+                                                                      // on the
+                                                                      // size 
of
+                                                                      // the
+                                                                      // queue
+          this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, null, null,
+              this.entranceProcessor.nextEvent()));
+        } else {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            break;
+          }
+        }
+      }
+
+      // Send last event
+      this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition, null, null,
+          this.entranceProcessor.nextEvent()));
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
index db72e7c..7c97e65 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
@@ -46,120 +46,127 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 /**
- * ProcessingItem for Samza
- * which is also a Samza task (StreamTask and InitableTask)
+ * ProcessingItem for Samza which is also a Samza task (StreamTask and
+ * InitableTask)
  * 
  * @author Anh Thu Vu
  */
-public class SamzaProcessingItem extends AbstractProcessingItem 
-                                 implements SamzaProcessingNode, Serializable, 
StreamTask, InitableTask {
-       
-       /**
+public class SamzaProcessingItem extends AbstractProcessingItem
+    implements SamzaProcessingNode, Serializable, StreamTask, InitableTask {
+
+  /**
         * 
         */
-       private static final long serialVersionUID = 1L;
+  private static final long serialVersionUID = 1L;
 
-       private Set<SamzaSystemStream> inputStreams; // input streams: 
system.stream
-       private List<SamzaStream> outputStreams;
-       
-       /*
-        * Constructors
-        */
-       // Need this so Samza can initialize a StreamTask
-       public SamzaProcessingItem() {}
-       
-       /* 
-        * Implement com.yahoo.labs.samoa.topology.ProcessingItem
-        */
-       public SamzaProcessingItem(Processor processor, int parallelismHint) {
-               super(processor, parallelismHint);
-               this.inputStreams = new HashSet<SamzaSystemStream>();
-               this.outputStreams = new LinkedList<SamzaStream>();
-       }
-       
-       /*
-        * Simple setters, getters
-        */
-       public Set<SamzaSystemStream> getInputStreams() {
-               return this.inputStreams;
-       }
-       
-       /*
-        * Extends AbstractProcessingItem
-        */
-       @Override
-       protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
-               SamzaSystemStream stream = ((SamzaStream) 
inputStream).addDestination(new 
StreamDestination(this,this.getParallelism(),scheme));
-               this.inputStreams.add(stream);
-               return this;
-       }
-
-       /*
-        * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode
-        */
-       @Override
-       public int addOutputStream(SamzaStream stream) {
-               this.outputStreams.add(stream);
-               return this.outputStreams.size();
-       }
-       
-       public List<SamzaStream> getOutputStreams() {
-               return this.outputStreams;
-       }
-
-       /*
-        * Implement Samza task
-        */
-       @Override
-       public void init(Config config, TaskContext context) throws Exception {
-               String yarnConfHome = 
config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
-               if (yarnConfHome != null && yarnConfHome.length() > 0) // if 
the property is set , otherwise, assume we are running in
-                                                                               
                                // local mode and ignore this
-                       SystemsUtils.setHadoopConfigHome(yarnConfHome);
-               
-               String filename = config.get(SamzaConfigFactory.FILE_KEY);
-               String filesystem = 
config.get(SamzaConfigFactory.FILESYSTEM_KEY);
-               this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
-               SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, 
this.getName());
-               this.setProcessor(wrapper.processor);
-               this.outputStreams = wrapper.outputStreams;
-               
-               // Init Processor and Streams
-               this.getProcessor().onCreate(0);
-               for (SamzaStream stream:this.outputStreams) {
-                       stream.onCreate();
-               }
-               
-       }
-
-       @Override
-       public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) throws Exception {
-               for (SamzaStream stream:this.outputStreams) {
-                       stream.setCollector(collector);
-               }
-               this.getProcessor().process((ContentEvent) 
envelope.getMessage());
-       }
-       
-       /*
-        * SerializationProxy
-        */
-       private Object writeReplace() {
-               return new SerializationProxy(this);
-       }
-       
-       private static class SerializationProxy implements Serializable {
-               /**
+  private Set<SamzaSystemStream> inputStreams; // input streams: system.stream
+  private List<SamzaStream> outputStreams;
+
+  /*
+   * Constructors
+   */
+  // Need this so Samza can initialize a StreamTask
+  public SamzaProcessingItem() {
+  }
+
+  /*
+   * Implement com.yahoo.labs.samoa.topology.ProcessingItem
+   */
+  public SamzaProcessingItem(Processor processor, int parallelismHint) {
+    super(processor, parallelismHint);
+    this.inputStreams = new HashSet<SamzaSystemStream>();
+    this.outputStreams = new LinkedList<SamzaStream>();
+  }
+
+  /*
+   * Simple setters, getters
+   */
+  public Set<SamzaSystemStream> getInputStreams() {
+    return this.inputStreams;
+  }
+
+  /*
+   * Extends AbstractProcessingItem
+   */
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    SamzaSystemStream stream = ((SamzaStream) inputStream).addDestination(new 
StreamDestination(this, this
+        .getParallelism(), scheme));
+    this.inputStreams.add(stream);
+    return this;
+  }
+
+  /*
+   * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode
+   */
+  @Override
+  public int addOutputStream(SamzaStream stream) {
+    this.outputStreams.add(stream);
+    return this.outputStreams.size();
+  }
+
+  public List<SamzaStream> getOutputStreams() {
+    return this.outputStreams;
+  }
+
+  /*
+   * Implement Samza task
+   */
+  @Override
+  public void init(Config config, TaskContext context) throws Exception {
+    String yarnConfHome = config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+    if (yarnConfHome != null && yarnConfHome.length() > 0) // if the property 
is
+                                                           // set , otherwise,
+                                                           // assume we are
+                                                           // running in
+      // local mode and ignore this
+      SystemsUtils.setHadoopConfigHome(yarnConfHome);
+
+    String filename = config.get(SamzaConfigFactory.FILE_KEY);
+    String filesystem = config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+    this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
+    SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem,
+        filename, this.getName());
+    this.setProcessor(wrapper.processor);
+    this.outputStreams = wrapper.outputStreams;
+
+    // Init Processor and Streams
+    this.getProcessor().onCreate(0);
+    for (SamzaStream stream : this.outputStreams) {
+      stream.onCreate();
+    }
+
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator)
+      throws Exception {
+    for (SamzaStream stream : this.outputStreams) {
+      stream.setCollector(collector);
+    }
+    this.getProcessor().process((ContentEvent) envelope.getMessage());
+  }
+
+  /*
+   * SerializationProxy
+   */
+  private Object writeReplace() {
+    return new SerializationProxy(this);
+  }
+
+  private static class SerializationProxy implements Serializable {
+    /**
                 * 
                 */
-               private static final long serialVersionUID = 
1534643987559070336L;
-               
-               private Processor processor;
-               private List<SamzaStream> outputStreams;
-               
-               public SerializationProxy(SamzaProcessingItem pi) {
-                       this.processor = pi.getProcessor();
-                       this.outputStreams = pi.getOutputStreams();
-               }
-       }
+    private static final long serialVersionUID = 1534643987559070336L;
+
+    private Processor processor;
+    private List<SamzaStream> outputStreams;
+
+    public SerializationProxy(SamzaProcessingItem pi) {
+      this.processor = pi.getProcessor();
+      this.outputStreams = pi.getOutputStreams();
+    }
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
index be13673..1dbccb6 100644
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
@@ -23,34 +23,36 @@ package com.yahoo.labs.samoa.topology.impl;
 import com.yahoo.labs.samoa.topology.IProcessingItem;
 
 /**
- * Common interface of SamzaEntranceProcessingItem and
- * SamzaProcessingItem
+ * Common interface of SamzaEntranceProcessingItem and SamzaProcessingItem
  * 
  * @author Anh Thu Vu
  */
 public interface SamzaProcessingNode extends IProcessingItem {
-       /**
-        * Registers an output stream with this processing item
-        * 
-        * @param stream
-        *               the output stream
-        * @return the number of output streams of this processing item
-        */
-       public int addOutputStream(SamzaStream stream);
-       
-       /**
-        * Gets the name/id of this processing item
-        * 
-        * @return the name/id of this processing item
-        */
-       // TODO: include getName() and setName() in IProcessingItem and/or 
AbstractEPI/PI
-       public String getName();
-       
-       /**
-        * Sets the name/id for this processing item
-        * @param name
-        *            the name/id of this processing item
-        */
-       // TODO: include getName() and setName() in IProcessingItem and/or 
AbstractEPI/PI
-       public void setName(String name);
+  /**
+   * Registers an output stream with this processing item
+   * 
+   * @param stream
+   *          the output stream
+   * @return the number of output streams of this processing item
+   */
+  public int addOutputStream(SamzaStream stream);
+
+  /**
+   * Gets the name/id of this processing item
+   * 
+   * @return the name/id of this processing item
+   */
+  // TODO: include getName() and setName() in IProcessingItem and/or
+  // AbstractEPI/PI
+  public String getName();
+
+  /**
+   * Sets the name/id for this processing item
+   * 
+   * @param name
+   *          the name/id of this processing item
+   */
+  // TODO: include getName() and setName() in IProcessingItem and/or
+  // AbstractEPI/PI
+  public void setName(String name);
 }
\ No newline at end of file

Reply via email to