http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
index fc0630a..758ae1e 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
@@ -31,87 +31,88 @@ import backtype.storm.Config;
 import backtype.storm.utils.Utils;
 
 /**
- * The main class that used by samoa script to execute SAMOA task. 
+ * The main class that used by samoa script to execute SAMOA task.
  * 
  * @author Arinto Murdopo
- *
+ * 
  */
 public class StormDoTask {
-       private static final Logger logger = 
LoggerFactory.getLogger(StormDoTask.class);
-       private static String localFlag = "local";
-       private static String clusterFlag = "cluster";
-       
-       /**
-        * The main method.
-        * 
-        * @param args the arguments
-        */
-       public static void main(String[] args) {
-
-                List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
-               
-               boolean isLocal = isLocal(tmpArgs);
-               int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
-               
-               args = tmpArgs.toArray(new String[0]);
-               
-               //convert the arguments into Storm topology
-               StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
-               String topologyName = stormTopo.getTopologyName();
-               
-       Config conf = new Config();
-       conf.putAll(Utils.readStormConfig());
-       conf.setDebug(false);
-                       
-       
-               if(isLocal){
-                       //local mode
-                       conf.setMaxTaskParallelism(numWorker);
-                       
-                       backtype.storm.LocalCluster cluster = new 
backtype.storm.LocalCluster();
-                       cluster.submitTopology(topologyName , conf, 
stormTopo.getStormBuilder().createTopology());
-                       
-                       backtype.storm.utils.Utils.sleep(600*1000);
-                       
-                       cluster.killTopology(topologyName);
-                       cluster.shutdown();
-                       
-               }else{
-                       //cluster mode
-                       conf.setNumWorkers(numWorker);
-                       try {
-                               
backtype.storm.StormSubmitter.submitTopology(topologyName, conf,
-                                               
stormTopo.getStormBuilder().createTopology());
-                       } catch (backtype.storm.generated.AlreadyAliveException 
ale) {
-                               ale.printStackTrace();
-                       } catch 
(backtype.storm.generated.InvalidTopologyException ite) {
-                               ite.printStackTrace();
-                       }
-               }
-       }
-       
-       private static boolean isLocal(List<String> tmpArgs){
-               ExecutionMode executionMode = ExecutionMode.UNDETERMINED;
-               
-               int position = tmpArgs.size() - 1;
-               String flag = tmpArgs.get(position);
-               boolean isLocal = true;
-               
-               if(flag.equals(clusterFlag)){
-                       executionMode = ExecutionMode.CLUSTER;
-                       isLocal = false;
-               }else if(flag.equals(localFlag)){
-                       executionMode = ExecutionMode.LOCAL;
-                       isLocal = true;
-               }
-               
-               if(executionMode != ExecutionMode.UNDETERMINED){
-                       tmpArgs.remove(position);
-               }
-               
-               return isLocal;
-       }
-       
-       private enum ExecutionMode {LOCAL, CLUSTER, UNDETERMINED};
+  private static final Logger logger = 
LoggerFactory.getLogger(StormDoTask.class);
+  private static String localFlag = "local";
+  private static String clusterFlag = "cluster";
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
+
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+    boolean isLocal = isLocal(tmpArgs);
+    int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
+
+    args = tmpArgs.toArray(new String[0]);
+
+    // convert the arguments into Storm topology
+    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+    String topologyName = stormTopo.getTopologyName();
+
+    Config conf = new Config();
+    conf.putAll(Utils.readStormConfig());
+    conf.setDebug(false);
+
+    if (isLocal) {
+      // local mode
+      conf.setMaxTaskParallelism(numWorker);
+
+      backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster();
+      cluster.submitTopology(topologyName, conf, 
stormTopo.getStormBuilder().createTopology());
+
+      backtype.storm.utils.Utils.sleep(600 * 1000);
+
+      cluster.killTopology(topologyName);
+      cluster.shutdown();
+
+    } else {
+      // cluster mode
+      conf.setNumWorkers(numWorker);
+      try {
+        backtype.storm.StormSubmitter.submitTopology(topologyName, conf,
+            stormTopo.getStormBuilder().createTopology());
+      } catch (backtype.storm.generated.AlreadyAliveException ale) {
+        ale.printStackTrace();
+      } catch (backtype.storm.generated.InvalidTopologyException ite) {
+        ite.printStackTrace();
+      }
+    }
+  }
+
+  private static boolean isLocal(List<String> tmpArgs) {
+    ExecutionMode executionMode = ExecutionMode.UNDETERMINED;
+
+    int position = tmpArgs.size() - 1;
+    String flag = tmpArgs.get(position);
+    boolean isLocal = true;
+
+    if (flag.equals(clusterFlag)) {
+      executionMode = ExecutionMode.CLUSTER;
+      isLocal = false;
+    } else if (flag.equals(localFlag)) {
+      executionMode = ExecutionMode.LOCAL;
+      isLocal = true;
+    }
+
+    if (executionMode != ExecutionMode.UNDETERMINED) {
+      tmpArgs.remove(position);
+    }
+
+    return isLocal;
+  }
+
+  private enum ExecutionMode {
+    LOCAL, CLUSTER, UNDETERMINED
+  };
 }
-               

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
index d4d80bf..832ee34 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
@@ -41,168 +41,172 @@ import com.yahoo.labs.samoa.topology.Stream;
  * EntranceProcessingItem implementation for Storm.
  */
 class StormEntranceProcessingItem extends AbstractEntranceProcessingItem 
implements StormTopologyNode {
-    private final StormEntranceSpout piSpout;
-
-    StormEntranceProcessingItem(EntranceProcessor processor) {
-        this(processor, UUID.randomUUID().toString());
+  private final StormEntranceSpout piSpout;
+
+  StormEntranceProcessingItem(EntranceProcessor processor) {
+    this(processor, UUID.randomUUID().toString());
+  }
+
+  StormEntranceProcessingItem(EntranceProcessor processor, String friendlyId) {
+    super(processor);
+    this.setName(friendlyId);
+    this.piSpout = new StormEntranceSpout(processor);
+  }
+
+  @Override
+  public EntranceProcessingItem setOutputStream(Stream stream) {
+    // piSpout.streams.add(stream);
+    piSpout.setOutputStream((StormStream) stream);
+    return this;
+  }
+
+  @Override
+  public Stream getOutputStream() {
+    return piSpout.getOutputStream();
+  }
+
+  @Override
+  public void addToTopology(StormTopology topology, int parallelismHint) {
+    topology.getStormBuilder().setSpout(this.getName(), piSpout, 
parallelismHint);
+  }
+
+  @Override
+  public StormStream createStream() {
+    return piSpout.createStream(this.getName());
+  }
+
+  @Override
+  public String getId() {
+    return this.getName();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.insert(0, String.format("id: %s, ", this.getName()));
+    return sb.toString();
+  }
+
+  /**
+   * Resulting Spout of StormEntranceProcessingItem
+   */
+  final static class StormEntranceSpout extends BaseRichSpout {
+
+    private static final long serialVersionUID = -9066409791668954099L;
+
+    // private final Set<StormSpoutStream> streams;
+    private final EntranceProcessor entranceProcessor;
+    private StormStream outputStream;
+
+    // private transient SpoutStarter spoutStarter;
+    // private transient Executor spoutExecutors;
+    // private transient LinkedBlockingQueue<StormTupleInfo> tupleInfoQueue;
+
+    private SpoutOutputCollector collector;
+
+    StormEntranceSpout(EntranceProcessor processor) {
+      // this.streams = new HashSet<StormSpoutStream>();
+      this.entranceProcessor = processor;
     }
 
-    StormEntranceProcessingItem(EntranceProcessor processor, String 
friendlyId) {
-       super(processor);
-       this.setName(friendlyId);
-        this.piSpout = new StormEntranceSpout(processor);
+    public StormStream getOutputStream() {
+      return outputStream;
     }
 
-    @Override
-    public EntranceProcessingItem setOutputStream(Stream stream) {
-        // piSpout.streams.add(stream);
-        piSpout.setOutputStream((StormStream) stream);
-        return this;
-    }
-    
-    @Override
-    public Stream getOutputStream() {
-       return piSpout.getOutputStream();
+    public void setOutputStream(StormStream stream) {
+      this.outputStream = stream;
     }
-    
+
     @Override
-    public void addToTopology(StormTopology topology, int parallelismHint) {
-        topology.getStormBuilder().setSpout(this.getName(), piSpout, 
parallelismHint);
+    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext 
context, SpoutOutputCollector collector) {
+      this.collector = collector;
+      // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>();
+
+      // Processor and this class share the same instance of stream
+      // for (StormSpoutStream stream : streams) {
+      // stream.setSpout(this);
+      // }
+      // outputStream.setSpout(this);
+
+      this.entranceProcessor.onCreate(context.getThisTaskId());
+      // this.spoutStarter = new SpoutStarter(this.starter);
+
+      // this.spoutExecutors = Executors.newSingleThreadExecutor();
+      // this.spoutExecutors.execute(spoutStarter);
     }
 
     @Override
-    public StormStream createStream() {
-        return piSpout.createStream(this.getName());
+    public void nextTuple() {
+      if (entranceProcessor.hasNext()) {
+        Values value = newValues(entranceProcessor.nextEvent());
+        collector.emit(outputStream.getOutputId(), value);
+      } else
+        Utils.sleep(1000);
+      // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50,
+      // TimeUnit.MILLISECONDS);
+      // if (tupleInfo != null) {
+      // Values value = newValues(tupleInfo.getContentEvent());
+      // collector.emit(tupleInfo.getStormStream().getOutputId(), value);
+      // }
     }
 
     @Override
-    public String getId() {
-        return this.getName();
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      // for (StormStream stream : streams) {
+      // declarer.declareStream(stream.getOutputId(), new
+      // Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
+      // StormSamoaUtils.KEY_FIELD));
+      // }
+      declarer.declareStream(outputStream.getOutputId(), new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
+          StormSamoaUtils.KEY_FIELD));
     }
 
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder(super.toString());
-        sb.insert(0, String.format("id: %s, ", this.getName()));
-        return sb.toString();
+    StormStream createStream(String piId) {
+      // StormSpoutStream stream = new StormSpoutStream(piId);
+      StormStream stream = new StormBoltStream(piId);
+      // streams.add(stream);
+      return stream;
     }
 
-    /**
-     * Resulting Spout of StormEntranceProcessingItem
-     */
-    final static class StormEntranceSpout extends BaseRichSpout {
-
-        private static final long serialVersionUID = -9066409791668954099L;
-
-        // private final Set<StormSpoutStream> streams;
-        private final EntranceProcessor entranceProcessor;
-        private StormStream outputStream;
-
-        // private transient SpoutStarter spoutStarter;
-        // private transient Executor spoutExecutors;
-        // private transient LinkedBlockingQueue<StormTupleInfo> 
tupleInfoQueue;
-
-        private SpoutOutputCollector collector;
-
-        StormEntranceSpout(EntranceProcessor processor) {
-            // this.streams = new HashSet<StormSpoutStream>();
-            this.entranceProcessor = processor;
-        }
-
-        public StormStream getOutputStream() {
-            return outputStream;
-        }
-
-        public void setOutputStream(StormStream stream) {
-            this.outputStream = stream;
-        }
-
-        @Override
-        public void open(@SuppressWarnings("rawtypes") Map conf, 
TopologyContext context, SpoutOutputCollector collector) {
-            this.collector = collector;
-            // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>();
-
-            // Processor and this class share the same instance of stream
-            // for (StormSpoutStream stream : streams) {
-            // stream.setSpout(this);
-            // }
-            // outputStream.setSpout(this);
-
-            this.entranceProcessor.onCreate(context.getThisTaskId());
-            // this.spoutStarter = new SpoutStarter(this.starter);
-
-            // this.spoutExecutors = Executors.newSingleThreadExecutor();
-            // this.spoutExecutors.execute(spoutStarter);
-        }
-
-        @Override
-        public void nextTuple() {
-            if (entranceProcessor.hasNext()) {
-                Values value = newValues(entranceProcessor.nextEvent());
-                collector.emit(outputStream.getOutputId(), value);
-            } else
-                Utils.sleep(1000);
-            // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, 
TimeUnit.MILLISECONDS);
-            // if (tupleInfo != null) {
-            // Values value = newValues(tupleInfo.getContentEvent());
-            // collector.emit(tupleInfo.getStormStream().getOutputId(), value);
-            // }
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            // for (StormStream stream : streams) {
-            // declarer.declareStream(stream.getOutputId(), new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, StormSamoaUtils.KEY_FIELD));
-            // }
-            declarer.declareStream(outputStream.getOutputId(), new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, StormSamoaUtils.KEY_FIELD));
-        }
-
-        StormStream createStream(String piId) {
-            // StormSpoutStream stream = new StormSpoutStream(piId);
-            StormStream stream = new StormBoltStream(piId);
-            // streams.add(stream);
-            return stream;
-        }
-
-        // void put(StormSpoutStream stream, ContentEvent contentEvent) {
-        // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent));
-        // }
-
-        private Values newValues(ContentEvent contentEvent) {
-            return new Values(contentEvent, contentEvent.getKey());
-        }
-
-        // private final static class StormTupleInfo {
-        //
-        // private final StormStream stream;
-        // private final ContentEvent event;
-        //
-        // StormTupleInfo(StormStream stream, ContentEvent event) {
-        // this.stream = stream;
-        // this.event = event;
-        // }
-        //
-        // public StormStream getStormStream() {
-        // return this.stream;
-        // }
-        //
-        // public ContentEvent getContentEvent() {
-        // return this.event;
-        // }
-        // }
-
-        // private final static class SpoutStarter implements Runnable {
-        //
-        // private final TopologyStarter topoStarter;
-        //
-        // SpoutStarter(TopologyStarter topoStarter) {
-        // this.topoStarter = topoStarter;
-        // }
-        //
-        // @Override
-        // public void run() {
-        // this.topoStarter.start();
-        // }
-        // }
+    // void put(StormSpoutStream stream, ContentEvent contentEvent) {
+    // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent));
+    // }
+
+    private Values newValues(ContentEvent contentEvent) {
+      return new Values(contentEvent, contentEvent.getKey());
     }
+
+    // private final static class StormTupleInfo {
+    //
+    // private final StormStream stream;
+    // private final ContentEvent event;
+    //
+    // StormTupleInfo(StormStream stream, ContentEvent event) {
+    // this.stream = stream;
+    // this.event = event;
+    // }
+    //
+    // public StormStream getStormStream() {
+    // return this.stream;
+    // }
+    //
+    // public ContentEvent getContentEvent() {
+    // return this.event;
+    // }
+    // }
+
+    // private final static class SpoutStarter implements Runnable {
+    //
+    // private final TopologyStarter topoStarter;
+    //
+    // SpoutStarter(TopologyStarter topoStarter) {
+    // this.topoStarter = topoStarter;
+    // }
+    //
+    // @Override
+    // public void run() {
+    // this.topoStarter.start();
+    // }
+    // }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
index 5f86855..6594aa7 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
@@ -34,42 +34,42 @@ import backtype.storm.utils.Utils;
  * Utility class to submit samoa-storm jar to a Storm cluster.
  * 
  * @author Arinto Murdopo
- *
+ * 
  */
 public class StormJarSubmitter {
-       
-       public final static String UPLOADED_JAR_LOCATION_KEY = 
"UploadedJarLocation";
 
-       /**
-        * @param args
-        * @throws IOException 
-        */
-       public static void main(String[] args) throws IOException {
-               
-               Config config = new Config();
-               config.putAll(Utils.readCommandLineOpts());
-               config.putAll(Utils.readStormConfig());
+  public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation";
+
+  /**
+   * @param args
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+
+    Config config = new Config();
+    config.putAll(Utils.readCommandLineOpts());
+    config.putAll(Utils.readStormConfig());
+
+    String nimbusHost = (String) config.get(Config.NIMBUS_HOST);
+    int nimbusThriftPort = Utils.getInt(config
+        .get(Config.NIMBUS_THRIFT_PORT));
+
+    System.out.println("Nimbus host " + nimbusHost);
+    System.out.println("Nimbus thrift port " + nimbusThriftPort);
+
+    System.out.println("uploading jar from " + args[0]);
+    String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]);
+
+    System.out.println("Uploaded jar file location: ");
+    System.out.println(uploadedJarLocation);
 
-               String nimbusHost = (String) config.get(Config.NIMBUS_HOST);
-               int nimbusThriftPort = Utils.getInt(config
-                               .get(Config.NIMBUS_THRIFT_PORT));
+    Properties props = StormSamoaUtils.getProperties();
+    props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, 
uploadedJarLocation);
 
-               System.out.println("Nimbus host " + nimbusHost);
-               System.out.println("Nimbus thrift port " + nimbusThriftPort);
+    File f = new File("src/main/resources/samoa-storm-cluster.properties");
+    f.createNewFile();
 
-               System.out.println("uploading jar from " + args[0]);
-               String uploadedJarLocation = StormSubmitter.submitJar(config, 
args[0]);
-               
-               System.out.println("Uploaded jar file location: ");
-               System.out.println(uploadedJarLocation);
-               
-               Properties props = StormSamoaUtils.getProperties();
-               props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, 
uploadedJarLocation);
-               
-               File f = new 
File("src/main/resources/samoa-storm-cluster.properties");
-               f.createNewFile();
-               
-               OutputStream out = new FileOutputStream(f);
-               props.store(out, "properties file to store uploaded jar 
location from StormJarSubmitter");
-       }
+    OutputStream out = new FileOutputStream(f);
+    props.store(out, "properties file to store uploaded jar location from 
StormJarSubmitter");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
index 73879f6..1a9064c 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
@@ -44,127 +44,126 @@ import backtype.storm.tuple.Tuple;
 
 /**
  * ProcessingItem implementation for Storm.
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
 class StormProcessingItem extends AbstractProcessingItem implements 
StormTopologyNode {
-       private final ProcessingItemBolt piBolt;        
-       private BoltDeclarer piBoltDeclarer;
-       
-       //TODO: should we put parallelism hint here? 
-       //imo, parallelism hint only declared when we add this PI in the 
topology
-       //open for dicussion :p
-               
-       StormProcessingItem(Processor processor, int parallelismHint){
-               this(processor, UUID.randomUUID().toString(), parallelismHint);
-       }
-       
-       StormProcessingItem(Processor processor, String friendlyId, int 
parallelismHint){
-               super(processor, parallelismHint);
-               this.piBolt = new ProcessingItemBolt(processor);
-               this.setName(friendlyId);
-       }
-
-       @Override
-       protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
-               StormStream stormInputStream = (StormStream) inputStream;
-               InputStreamId inputId = stormInputStream.getInputId();
-               
-               switch(scheme) {
-               case SHUFFLE:
-                       
piBoltDeclarer.shuffleGrouping(inputId.getComponentId(),inputId.getStreamId());
-                       break;
-               case GROUP_BY_KEY:
-                       piBoltDeclarer.fieldsGrouping(
-                                       inputId.getComponentId(), 
-                                       inputId.getStreamId(), 
-                                       new Fields(StormSamoaUtils.KEY_FIELD));
-                       break;
-               case BROADCAST:
-                       piBoltDeclarer.allGrouping(
-                                       inputId.getComponentId(), 
-                                       inputId.getStreamId());
-                       break;
-               }
-               return this;
-       }
-       
-       @Override
-       public void addToTopology(StormTopology topology, int parallelismHint) {
-               if(piBoltDeclarer != null){
-                       //throw exception that one PI only belong to one 
topology
-               }else{          
-                       TopologyBuilder stormBuilder = 
topology.getStormBuilder();
-                       this.piBoltDeclarer = 
stormBuilder.setBolt(this.getName(), 
-                                       this.piBolt, parallelismHint);
-               }       
-       }
-
-       @Override
-       public StormStream createStream() {
-               return piBolt.createStream(this.getName());
-       }
-
-       @Override
-       public String getId() {
-               return this.getName();
-       }
-       
-       @Override
-       public String toString() {
-               StringBuilder sb = new StringBuilder(super.toString());
-               sb.insert(0, String.format("id: %s, ", this.getName()));
-               return sb.toString();
-       }
-               
-       private final static class ProcessingItemBolt extends BaseRichBolt{
-               
-               private static final long serialVersionUID = 
-6637673741263199198L;
-               
-               private final Set<StormBoltStream> streams;
-               private final Processor processor;
-               
-               private OutputCollector collector;
-               
-               ProcessingItemBolt(Processor processor){
-                       this.streams = new HashSet<StormBoltStream>();
-                       this.processor = processor;
-               }
-               
-               @Override
-               public void prepare(@SuppressWarnings("rawtypes") Map 
stormConf, TopologyContext context,
-                               OutputCollector collector) {
-                       this.collector = collector;     
-                       //Processor and this class share the same instance of 
stream
-                       for(StormBoltStream stream: streams){
-                               stream.setCollector(this.collector);
-                       }
-                       
-                       this.processor.onCreate(context.getThisTaskId());
-               }
-
-               @Override
-               public void execute(Tuple input) {
-                       Object sentObject = input.getValue(0);
-                       ContentEvent sentEvent = (ContentEvent)sentObject;
-                       processor.process(sentEvent);
-               }
-
-               @Override
-               public void declareOutputFields(OutputFieldsDeclarer declarer) {
-                       for(StormStream stream: streams){
-                               declarer.declareStream(stream.getOutputId(), 
-                                               new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
-                                                               
StormSamoaUtils.KEY_FIELD));
-                       }
-               }
-               
-               StormStream createStream(String piId){
-                       StormBoltStream stream = new StormBoltStream(piId);
-                       streams.add(stream);
-                       return stream;
-               }
-       }
-}
+  private final ProcessingItemBolt piBolt;
+  private BoltDeclarer piBoltDeclarer;
+
+  // TODO: should we put parallelism hint here?
+  // imo, parallelism hint only declared when we add this PI in the topology
+  // open for dicussion :p
+
+  StormProcessingItem(Processor processor, int parallelismHint) {
+    this(processor, UUID.randomUUID().toString(), parallelismHint);
+  }
+
+  StormProcessingItem(Processor processor, String friendlyId, int 
parallelismHint) {
+    super(processor, parallelismHint);
+    this.piBolt = new ProcessingItemBolt(processor);
+    this.setName(friendlyId);
+  }
+
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    StormStream stormInputStream = (StormStream) inputStream;
+    InputStreamId inputId = stormInputStream.getInputId();
+
+    switch (scheme) {
+    case SHUFFLE:
+      piBoltDeclarer.shuffleGrouping(inputId.getComponentId(), 
inputId.getStreamId());
+      break;
+    case GROUP_BY_KEY:
+      piBoltDeclarer.fieldsGrouping(
+          inputId.getComponentId(),
+          inputId.getStreamId(),
+          new Fields(StormSamoaUtils.KEY_FIELD));
+      break;
+    case BROADCAST:
+      piBoltDeclarer.allGrouping(
+          inputId.getComponentId(),
+          inputId.getStreamId());
+      break;
+    }
+    return this;
+  }
+
+  @Override
+  public void addToTopology(StormTopology topology, int parallelismHint) {
+    if (piBoltDeclarer != null) {
+      // throw exception that one PI only belong to one topology
+    } else {
+      TopologyBuilder stormBuilder = topology.getStormBuilder();
+      this.piBoltDeclarer = stormBuilder.setBolt(this.getName(),
+          this.piBolt, parallelismHint);
+    }
+  }
+
+  @Override
+  public StormStream createStream() {
+    return piBolt.createStream(this.getName());
+  }
+
+  @Override
+  public String getId() {
+    return this.getName();
+  }
 
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.insert(0, String.format("id: %s, ", this.getName()));
+    return sb.toString();
+  }
 
+  private final static class ProcessingItemBolt extends BaseRichBolt {
+
+    private static final long serialVersionUID = -6637673741263199198L;
+
+    private final Set<StormBoltStream> streams;
+    private final Processor processor;
+
+    private OutputCollector collector;
+
+    ProcessingItemBolt(Processor processor) {
+      this.streams = new HashSet<StormBoltStream>();
+      this.processor = processor;
+    }
+
+    @Override
+    public void prepare(@SuppressWarnings("rawtypes") Map stormConf, 
TopologyContext context,
+        OutputCollector collector) {
+      this.collector = collector;
+      // Processor and this class share the same instance of stream
+      for (StormBoltStream stream : streams) {
+        stream.setCollector(this.collector);
+      }
+
+      this.processor.onCreate(context.getThisTaskId());
+    }
+
+    @Override
+    public void execute(Tuple input) {
+      Object sentObject = input.getValue(0);
+      ContentEvent sentEvent = (ContentEvent) sentObject;
+      processor.process(sentEvent);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      for (StormStream stream : streams) {
+        declarer.declareStream(stream.getOutputId(),
+            new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
+                StormSamoaUtils.KEY_FIELD));
+      }
+    }
+
+    StormStream createStream(String piId) {
+      StormBoltStream stream = new StormBoltStream(piId);
+      streams.add(stream);
+      return stream;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
index 7c4769e..d978a8f 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
@@ -34,75 +34,82 @@ import org.slf4j.LoggerFactory;
 import com.yahoo.labs.samoa.tasks.Task;
 
 /**
- * Utility class for samoa-storm project. It is used by StormDoTask to process 
its arguments.
+ * Utility class for samoa-storm project. It is used by StormDoTask to process
+ * its arguments.
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
 public class StormSamoaUtils {
-       
-       private static final Logger logger = 
LoggerFactory.getLogger(StormSamoaUtils.class);
-
-       static final String KEY_FIELD = "key";
-       static final String CONTENT_EVENT_FIELD = "content_event";
-               
-       static Properties getProperties() throws IOException{
-               Properties props = new Properties();
-               InputStream is;
-               
-               File f = new 
File("src/main/resources/samoa-storm-cluster.properties"); // FIXME it does not 
exist anymore
-               is = new FileInputStream(f);
-               
-               try {
-                       props.load(is);
-               } catch (IOException e1) {
-                       System.out.println("Fail to load property file");
-                       return null;
-               } finally{
-                       is.close();
-               }
-               
-               return props;
-       }
-       
-       public static StormTopology argsToTopology(String[] args){
-               StringBuilder cliString = new StringBuilder();
-               for (String arg : args) {
-                       cliString.append(" ").append(arg);
-               }
-               logger.debug("Command line string = {}", cliString.toString());
-
-               Task task = getTask(cliString.toString());
-               
-               //TODO: remove setFactory method with DynamicBinding
-               task.setFactory(new StormComponentFactory());
-               task.init();
-
-               return (StormTopology)task.getTopology();
-       }
-       
-       public static int numWorkers(List<String> tmpArgs){
-               int position = tmpArgs.size() - 1;
-               int numWorkers;
-               
-               try {
-                       numWorkers = Integer.parseInt(tmpArgs.get(position));
-                       tmpArgs.remove(position);
-               } catch (NumberFormatException e) {
-                       numWorkers = 4;
-               }
-               
-               return numWorkers;
-       }
-
-    public static Task getTask(String cliString) {
-        Task task = null;
-        try {
-            logger.debug("Providing task [{}]", cliString);
-            task = ClassOption.cliStringToObject(cliString, Task.class, null);
-        } catch (Exception e) {
-            logger.warn("Fail in initializing the task!");
-            e.printStackTrace();
-        }
-        return task;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(StormSamoaUtils.class);
+
+  static final String KEY_FIELD = "key";
+  static final String CONTENT_EVENT_FIELD = "content_event";
+
+  static Properties getProperties() throws IOException {
+    Properties props = new Properties();
+    InputStream is;
+
+    File f = new File("src/main/resources/samoa-storm-cluster.properties"); // 
FIXME
+                                                                            // 
it
+                                                                            // 
does
+                                                                            // 
not
+                                                                            // 
exist
+                                                                            // 
anymore
+    is = new FileInputStream(f);
+
+    try {
+      props.load(is);
+    } catch (IOException e1) {
+      System.out.println("Fail to load property file");
+      return null;
+    } finally {
+      is.close();
+    }
+
+    return props;
+  }
+
+  public static StormTopology argsToTopology(String[] args) {
+    StringBuilder cliString = new StringBuilder();
+    for (String arg : args) {
+      cliString.append(" ").append(arg);
+    }
+    logger.debug("Command line string = {}", cliString.toString());
+
+    Task task = getTask(cliString.toString());
+
+    // TODO: remove setFactory method with DynamicBinding
+    task.setFactory(new StormComponentFactory());
+    task.init();
+
+    return (StormTopology) task.getTopology();
+  }
+
+  public static int numWorkers(List<String> tmpArgs) {
+    int position = tmpArgs.size() - 1;
+    int numWorkers;
+
+    try {
+      numWorkers = Integer.parseInt(tmpArgs.get(position));
+      tmpArgs.remove(position);
+    } catch (NumberFormatException e) {
+      numWorkers = 4;
+    }
+
+    return numWorkers;
+  }
+
+  public static Task getTask(String cliString) {
+    Task task = null;
+    try {
+      logger.debug("Providing task [{}]", cliString);
+      task = ClassOption.cliStringToObject(cliString, Task.class, null);
+    } catch (Exception e) {
+      logger.warn("Fail in initializing the task!");
+      e.printStackTrace();
     }
+    return task;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
index d066e42..06f5bb2 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
@@ -62,4 +62,4 @@
 //             return null;
 //     }
 //
-//}
+// }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
index f67ab19..ed39a50 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
@@ -27,59 +27,60 @@ import com.yahoo.labs.samoa.topology.Stream;
 
 /**
  * Abstract class to implement Storm Stream
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
 abstract class StormStream implements Stream, java.io.Serializable {
-               
-       /**
+
+  /**
         * 
         */
-       private static final long serialVersionUID = 281835563756514852L;
-       protected final String outputStreamId;
-       protected final InputStreamId inputStreamId;
-       
-       public StormStream(String stormComponentId){
-               this.outputStreamId = UUID.randomUUID().toString();
-               this.inputStreamId = new InputStreamId(stormComponentId, 
this.outputStreamId);
-       }
-       
-       @Override
-       public abstract void put(ContentEvent contentEvent);
-       
-       String getOutputId(){
-               return this.outputStreamId;
-       }
-       
-       InputStreamId getInputId(){
-               return this.inputStreamId;
-       }
-       
-       final static class InputStreamId implements java.io.Serializable{
-               
-               /**
+  private static final long serialVersionUID = 281835563756514852L;
+  protected final String outputStreamId;
+  protected final InputStreamId inputStreamId;
+
+  public StormStream(String stormComponentId) {
+    this.outputStreamId = UUID.randomUUID().toString();
+    this.inputStreamId = new InputStreamId(stormComponentId, 
this.outputStreamId);
+  }
+
+  @Override
+  public abstract void put(ContentEvent contentEvent);
+
+  String getOutputId() {
+    return this.outputStreamId;
+  }
+
+  InputStreamId getInputId() {
+    return this.inputStreamId;
+  }
+
+  final static class InputStreamId implements java.io.Serializable {
+
+    /**
                 * 
                 */
-               private static final long serialVersionUID = 
-7457995634133691295L;
-               private final String componentId;
-               private final String streamId;
-               
-               InputStreamId(String componentId, String streamId){
-                       this.componentId = componentId;
-                       this.streamId = streamId;
-               }
-               
-               String getComponentId(){
-                       return componentId;
-               }
-               
-               String getStreamId(){
-                       return streamId;
-               }
-       }
-       
-       @Override
-       public void setBatchSize(int batchSize) {
-               // Ignore batch size
-       }
+    private static final long serialVersionUID = -7457995634133691295L;
+    private final String componentId;
+    private final String streamId;
+
+    InputStreamId(String componentId, String streamId) {
+      this.componentId = componentId;
+      this.streamId = streamId;
+    }
+
+    String getComponentId() {
+      return componentId;
+    }
+
+    String getStreamId() {
+      return streamId;
+    }
+  }
+
+  @Override
+  public void setBatchSize(int batchSize) {
+    // Ignore batch size
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
index 7a49d8b..20995d5 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
@@ -27,26 +27,27 @@ import com.yahoo.labs.samoa.topology.AbstractTopology;
 
 /**
  * Adaptation of SAMOA topology in samoa-storm
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
 public class StormTopology extends AbstractTopology {
-       
-       private TopologyBuilder builder;
-       
-       public StormTopology(String topologyName){
-               super(topologyName);
-               this.builder = new TopologyBuilder();
-       }
-       
-       @Override
-       public void addProcessingItem(IProcessingItem procItem, int 
parallelismHint){
-               StormTopologyNode stormNode = (StormTopologyNode) procItem;
-               stormNode.addToTopology(this, parallelismHint);
-               super.addProcessingItem(procItem, parallelismHint);
-       }
-       
-       public TopologyBuilder getStormBuilder(){
-               return builder;
-       }
+
+  private TopologyBuilder builder;
+
+  public StormTopology(String topologyName) {
+    super(topologyName);
+    this.builder = new TopologyBuilder();
+  }
+
+  @Override
+  public void addProcessingItem(IProcessingItem procItem, int parallelismHint) 
{
+    StormTopologyNode stormNode = (StormTopologyNode) procItem;
+    stormNode.addToTopology(this, parallelismHint);
+    super.addProcessingItem(procItem, parallelismHint);
+  }
+
+  public TopologyBuilder getStormBuilder() {
+    return builder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
index 07fccbf..8be3a1b 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
@@ -22,13 +22,16 @@ package com.yahoo.labs.samoa.topology.impl;
 
 /**
  * Interface to represent a node in samoa-storm topology.
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
 interface StormTopologyNode {
 
-       void addToTopology(StormTopology topology, int parallelismHint);
-       StormStream createStream();
-       String getId();
-       
+  void addToTopology(StormTopology topology, int parallelismHint);
+
+  StormStream createStream();
+
+  String getId();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
index 1e1b048..a4f1f51 100644
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
@@ -41,93 +41,95 @@ import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
 /**
- * Helper class to submit SAMOA task into Storm without the need of submitting 
the jar file.
- * The jar file must be submitted first using StormJarSubmitter class.
+ * Helper class to submit SAMOA task into Storm without the need of submitting
+ * the jar file. The jar file must be submitted first using StormJarSubmitter
+ * class.
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
 public class StormTopologySubmitter {
-       
-       public static String YJP_OPTIONS_KEY="YjpOptions";
-       
-       private static Logger logger = 
LoggerFactory.getLogger(StormTopologySubmitter.class);
-               
-       public static void main(String[] args) throws IOException{
-               Properties props = StormSamoaUtils.getProperties();
-               
-               String uploadedJarLocation = 
props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
-               if(uploadedJarLocation == null){
-                       logger.error("Invalid properties file. It must have key 
{}", 
-                                       
StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
-                       return;
-               }
-               
-               List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
-               int numWorkers = StormSamoaUtils.numWorkers(tmpArgs);
-               
-               args = tmpArgs.toArray(new String[0]);
-               StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
-
-               Config conf = new Config();
-               conf.putAll(Utils.readStormConfig());
-               conf.putAll(Utils.readCommandLineOpts());
-               conf.setDebug(false);
-               conf.setNumWorkers(numWorkers);
-               
-               String profilerOption = 
-                               
props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY);
-               if(profilerOption != null){
-                       String topoWorkerChildOpts =  (String) 
conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
-                       StringBuilder optionBuilder = new StringBuilder();
-                       if(topoWorkerChildOpts != null){
-                               optionBuilder.append(topoWorkerChildOpts);      
-                               optionBuilder.append(' ');
-                       }
-                       optionBuilder.append(profilerOption);
-                       conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, 
optionBuilder.toString());
-               }
-
-               Map<String, Object> myConfigMap = new HashMap<String, 
Object>(conf);
-               StringWriter out = new StringWriter();
-
-               try {
-                       JSONValue.writeJSONString(myConfigMap, out);
-               } catch (IOException e) {
-                       System.out.println("Error in writing JSONString");
-                       e.printStackTrace();
-                       return;
-               }
-               
-               Config config = new Config();
-               config.putAll(Utils.readStormConfig());
-               
-               String nimbusHost = (String) config.get(Config.NIMBUS_HOST);
-                               
-               NimbusClient nc = new NimbusClient(nimbusHost);
-               String topologyName = stormTopo.getTopologyName();
-               try {
-                       System.out.println("Submitting topology with name: " 
-                                       + topologyName);
-                       nc.getClient().submitTopology(topologyName, 
uploadedJarLocation,
-                                       out.toString(), 
stormTopo.getStormBuilder().createTopology());
-                       System.out.println(topologyName + " is successfully 
submitted");
-
-               } catch (AlreadyAliveException aae) {
-                       System.out.println("Fail to submit " + topologyName
-                                       + "\nError message: " + aae.get_msg());
-               } catch (InvalidTopologyException ite) {
-                       System.out.println("Invalid topology for " + 
topologyName);
-                       ite.printStackTrace();
-               } catch (TException te) {
-                       System.out.println("Texception for " + topologyName);
-                       te.printStackTrace();
-               }               
-       }
-       
-       private static String uploadedJarLocation(List<String> tmpArgs){
-               int position = tmpArgs.size() - 1;
-               String uploadedJarLocation = tmpArgs.get(position);
-               tmpArgs.remove(position);
-               return uploadedJarLocation;
-       }
+
+  public static String YJP_OPTIONS_KEY = "YjpOptions";
+
+  private static Logger logger = 
LoggerFactory.getLogger(StormTopologySubmitter.class);
+
+  public static void main(String[] args) throws IOException {
+    Properties props = StormSamoaUtils.getProperties();
+
+    String uploadedJarLocation = 
props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
+    if (uploadedJarLocation == null) {
+      logger.error("Invalid properties file. It must have key {}",
+          StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
+      return;
+    }
+
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+    int numWorkers = StormSamoaUtils.numWorkers(tmpArgs);
+
+    args = tmpArgs.toArray(new String[0]);
+    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+
+    Config conf = new Config();
+    conf.putAll(Utils.readStormConfig());
+    conf.putAll(Utils.readCommandLineOpts());
+    conf.setDebug(false);
+    conf.setNumWorkers(numWorkers);
+
+    String profilerOption =
+        props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY);
+    if (profilerOption != null) {
+      String topoWorkerChildOpts = (String) 
conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
+      StringBuilder optionBuilder = new StringBuilder();
+      if (topoWorkerChildOpts != null) {
+        optionBuilder.append(topoWorkerChildOpts);
+        optionBuilder.append(' ');
+      }
+      optionBuilder.append(profilerOption);
+      conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString());
+    }
+
+    Map<String, Object> myConfigMap = new HashMap<String, Object>(conf);
+    StringWriter out = new StringWriter();
+
+    try {
+      JSONValue.writeJSONString(myConfigMap, out);
+    } catch (IOException e) {
+      System.out.println("Error in writing JSONString");
+      e.printStackTrace();
+      return;
+    }
+
+    Config config = new Config();
+    config.putAll(Utils.readStormConfig());
+
+    String nimbusHost = (String) config.get(Config.NIMBUS_HOST);
+
+    NimbusClient nc = new NimbusClient(nimbusHost);
+    String topologyName = stormTopo.getTopologyName();
+    try {
+      System.out.println("Submitting topology with name: "
+          + topologyName);
+      nc.getClient().submitTopology(topologyName, uploadedJarLocation,
+          out.toString(), stormTopo.getStormBuilder().createTopology());
+      System.out.println(topologyName + " is successfully submitted");
+
+    } catch (AlreadyAliveException aae) {
+      System.out.println("Fail to submit " + topologyName
+          + "\nError message: " + aae.get_msg());
+    } catch (InvalidTopologyException ite) {
+      System.out.println("Invalid topology for " + topologyName);
+      ite.printStackTrace();
+    } catch (TException te) {
+      System.out.println("Texception for " + topologyName);
+      te.printStackTrace();
+    }
+  }
+
+  private static String uploadedJarLocation(List<String> tmpArgs) {
+    int position = tmpArgs.size() - 1;
+    String uploadedJarLocation = tmpArgs.get(position);
+    tmpArgs.remove(position);
+    return uploadedJarLocation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java 
b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
index 15b80b5..9f6089c 100644
--- a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
+++ b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
@@ -24,45 +24,43 @@ import org.junit.Test;
 
 public class AlgosTest {
 
+  @Test(timeout = 60000)
+  public void testVHTWithStorm() throws Exception {
 
-    @Test(timeout = 60000)
-    public void testVHTWithStorm() throws Exception {
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(55f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
+        .resultFilePollTimeout(30)
+        .prePollWait(15)
+        .taskClassName(LocalStormDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
 
-        TestParams vhtConfig = new TestParams.Builder()
-                .inputInstances(200_000)
-                .samplingSize(20_000)
-                .evaluationInstances(200_000)
-                .classifiedInstances(200_000)
-                .classificationsCorrect(55f)
-                .kappaStat(0f)
-                .kappaTempStat(0f)
-                
.cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
-                .resultFilePollTimeout(30)
-                .prePollWait(15)
-                .taskClassName(LocalStormDoTask.class.getName())
-                .build();
-        TestUtils.test(vhtConfig);
+  }
 
-    }
-
-    @Test(timeout = 120000)
-    public void testBaggingWithStorm() throws Exception {
-        TestParams baggingConfig = new TestParams.Builder()
-                .inputInstances(200_000)
-                .samplingSize(20_000)
-                .evaluationInstances(180_000)
-                .classifiedInstances(190_000)
-                .classificationsCorrect(60f)
-                .kappaStat(0f)
-                .kappaTempStat(0f)
-                
.cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
-                .resultFilePollTimeout(40)
-                .prePollWait(20)
-                .taskClassName(LocalStormDoTask.class.getName())
-                .build();
-        TestUtils.test(baggingConfig);
-
-    }
+  @Test(timeout = 120000)
+  public void testBaggingWithStorm() throws Exception {
+    TestParams baggingConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(180_000)
+        .classifiedInstances(190_000)
+        .classificationsCorrect(60f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
+        .resultFilePollTimeout(40)
+        .prePollWait(20)
+        .taskClassName(LocalStormDoTask.class.getName())
+        .build();
+    TestUtils.test(baggingConfig);
 
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
 
b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
index ec8929a..d233ca6 100644
--- 
a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
+++ 
b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
@@ -38,41 +38,46 @@ import backtype.storm.topology.TopologyBuilder;
 import com.yahoo.labs.samoa.core.Processor;
 
 public class StormProcessingItemTest {
-    private static final int PARRALLELISM_HINT_2 = 2;
-    private static final int PARRALLELISM_HINT_4 = 4;
-    private static final String ID = "id";
-    @Tested private StormProcessingItem pi;
-    @Mocked private Processor processor;
-    @Mocked private StormTopology topology;
-    @Mocked private TopologyBuilder stormBuilder = new TopologyBuilder();
+  private static final int PARRALLELISM_HINT_2 = 2;
+  private static final int PARRALLELISM_HINT_4 = 4;
+  private static final String ID = "id";
+  @Tested
+  private StormProcessingItem pi;
+  @Mocked
+  private Processor processor;
+  @Mocked
+  private StormTopology topology;
+  @Mocked
+  private TopologyBuilder stormBuilder = new TopologyBuilder();
 
-    @Before
-    public void setUp() {
-        pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2);
-    }
+  @Before
+  public void setUp() {
+    pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2);
+  }
 
-    @Test
-    public void testAddToTopology() {
-        new Expectations() {
-            {
-                topology.getStormBuilder();
-                result = stormBuilder;
+  @Test
+  public void testAddToTopology() {
+    new Expectations() {
+      {
+        topology.getStormBuilder();
+        result = stormBuilder;
 
-                stormBuilder.setBolt(ID, (IRichBolt) any, anyInt);
-                result = new MockUp<BoltDeclarer>() {
-                }.getMockInstance();
-            }
-        };
+        stormBuilder.setBolt(ID, (IRichBolt) any, anyInt);
+        result = new MockUp<BoltDeclarer>() {
+        }.getMockInstance();
+      }
+    };
 
-        pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism 
hint is ignored
+    pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint 
is
+                                                     // ignored
 
-        new Verifications() {
-            {
-                assertEquals(pi.getProcessor(), processor);
-                // TODO add methods to explore a topology and verify them
-                assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2);
-                assertEquals(pi.getId(), ID);
-            }
-        };
-    }
+    new Verifications() {
+      {
+        assertEquals(pi.getProcessor(), processor);
+        // TODO add methods to explore a topology and verify them
+        assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2);
+        assertEquals(pi.getId(), ID);
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
----------------------------------------------------------------------
diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java 
b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
index 08ad94f..26b6b74 100644
--- a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
+++ b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
@@ -2,234 +2,234 @@ package com.yahoo.labs.samoa;
 
 public class TestParams {
 
-    /**
-     *  templates that take the following parameters:
-     *  <ul>
-     *      <li>the output file location as an argument (-d),
-     *      <li>the maximum number of instances for testing/training (-i)
-     *      <li>the sampling size (-f)
-     *      <li>the delay in ms between input instances (-w) , default is zero
-     *  </ul>
-     *  as well as the maximum number of instances for testing/training (-i) 
and the sampling size (-f)
-     */
-    public static class Templates {
-
-        public final static String PREQEVAL_VHT_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
-                + "-l 
(com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " +
-                "-s 
(com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 10 -u 
10)";
-
-        public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
-                + "-l (classifiers.SingleClassifier -l 
com.yahoo.labs.samoa.learners.classifiers.NaiveBayes) " +
-                "-s 
(com.yahoo.labs.samoa.moa.streams.generators.HyperplaneGenerator -c 2)";
-
-        // setting the number of nominal attributes to zero significantly 
reduces the processing time,
-        // so that it's acceptable in a test case
-        public final static String PREQEVAL_BAGGING_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
-                + "-l 
(com.yahoo.labs.samoa.learners.classifiers.ensemble.Bagging) " +
-                "-s 
(com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 0 -u 
10)";
-
-    }
-
-
-    public static final String EVALUATION_INSTANCES = "evaluation instances";
-    public static final String CLASSIFIED_INSTANCES = "classified instances";
-    public static final String CLASSIFICATIONS_CORRECT = "classifications 
correct (percent)";
-    public static final String KAPPA_STAT = "Kappa Statistic (percent)";
-    public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic 
(percent)";
-
-
+  /**
+   * templates that take the following parameters:
+   * <ul>
+   * <li>the output file location as an argument (-d),
+   * <li>the maximum number of instances for testing/training (-i)
+   * <li>the sampling size (-f)
+   * <li>the delay in ms between input instances (-w) , default is zero
+   * </ul>
+   * as well as the maximum number of instances for testing/training (-i) and
+   * the sampling size (-f)
+   */
+  public static class Templates {
+
+    public final static String PREQEVAL_VHT_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+        + "-l 
(com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " +
+        "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator 
-c 2 -o 10 -u 10)";
+
+    public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+        + "-l (classifiers.SingleClassifier -l 
com.yahoo.labs.samoa.learners.classifiers.NaiveBayes) " +
+        "-s (com.yahoo.labs.samoa.moa.streams.generators.HyperplaneGenerator 
-c 2)";
+
+    // setting the number of nominal attributes to zero significantly reduces
+    // the processing time,
+    // so that it's acceptable in a test case
+    public final static String PREQEVAL_BAGGING_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+        + "-l (com.yahoo.labs.samoa.learners.classifiers.ensemble.Bagging) " +
+        "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator 
-c 2 -o 0 -u 10)";
+
+  }
+
+  public static final String EVALUATION_INSTANCES = "evaluation instances";
+  public static final String CLASSIFIED_INSTANCES = "classified instances";
+  public static final String CLASSIFICATIONS_CORRECT = "classifications 
correct (percent)";
+  public static final String KAPPA_STAT = "Kappa Statistic (percent)";
+  public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic 
(percent)";
+
+  private long inputInstances;
+  private long samplingSize;
+  private long evaluationInstances;
+  private long classifiedInstances;
+  private float classificationsCorrect;
+  private float kappaStat;
+  private float kappaTempStat;
+  private String cliStringTemplate;
+  private int pollTimeoutSeconds;
+  private final int prePollWait;
+  private int inputDelayMicroSec;
+  private String taskClassName;
+
+  private TestParams(String taskClassName,
+      long inputInstances,
+      long samplingSize,
+      long evaluationInstances,
+      long classifiedInstances,
+      float classificationsCorrect,
+      float kappaStat,
+      float kappaTempStat,
+      String cliStringTemplate,
+      int pollTimeoutSeconds,
+      int prePollWait,
+      int inputDelayMicroSec) {
+    this.taskClassName = taskClassName;
+    this.inputInstances = inputInstances;
+    this.samplingSize = samplingSize;
+    this.evaluationInstances = evaluationInstances;
+    this.classifiedInstances = classifiedInstances;
+    this.classificationsCorrect = classificationsCorrect;
+    this.kappaStat = kappaStat;
+    this.kappaTempStat = kappaTempStat;
+    this.cliStringTemplate = cliStringTemplate;
+    this.pollTimeoutSeconds = pollTimeoutSeconds;
+    this.prePollWait = prePollWait;
+    this.inputDelayMicroSec = inputDelayMicroSec;
+  }
+
+  public String getTaskClassName() {
+    return taskClassName;
+  }
+
+  public long getInputInstances() {
+    return inputInstances;
+  }
+
+  public long getSamplingSize() {
+    return samplingSize;
+  }
+
+  public int getPollTimeoutSeconds() {
+    return pollTimeoutSeconds;
+  }
+
+  public int getPrePollWaitSeconds() {
+    return prePollWait;
+  }
+
+  public String getCliStringTemplate() {
+    return cliStringTemplate;
+  }
+
+  public long getEvaluationInstances() {
+    return evaluationInstances;
+  }
+
+  public long getClassifiedInstances() {
+    return classifiedInstances;
+  }
+
+  public float getClassificationsCorrect() {
+    return classificationsCorrect;
+  }
+
+  public float getKappaStat() {
+    return kappaStat;
+  }
+
+  public float getKappaTempStat() {
+    return kappaTempStat;
+  }
+
+  public int getInputDelayMicroSec() {
+    return inputDelayMicroSec;
+  }
+
+  @Override
+  public String toString() {
+    return "TestParams{\n" +
+        "inputInstances=" + inputInstances + "\n" +
+        "samplingSize=" + samplingSize + "\n" +
+        "evaluationInstances=" + evaluationInstances + "\n" +
+        "classifiedInstances=" + classifiedInstances + "\n" +
+        "classificationsCorrect=" + classificationsCorrect + "\n" +
+        "kappaStat=" + kappaStat + "\n" +
+        "kappaTempStat=" + kappaTempStat + "\n" +
+        "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" +
+        "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" +
+        "prePollWait=" + prePollWait + "\n" +
+        "taskClassName='" + taskClassName + '\'' + "\n" +
+        "inputDelayMicroSec=" + inputDelayMicroSec + "\n" +
+        '}';
+  }
+
+  public static class Builder {
     private long inputInstances;
     private long samplingSize;
     private long evaluationInstances;
     private long classifiedInstances;
     private float classificationsCorrect;
-    private float kappaStat;
-    private float kappaTempStat;
+    private float kappaStat = 0f;
+    private float kappaTempStat = 0f;
     private String cliStringTemplate;
-    private int pollTimeoutSeconds;
-    private final int prePollWait;
-    private int inputDelayMicroSec;
+    private int pollTimeoutSeconds = 10;
+    private int prePollWaitSeconds = 10;
     private String taskClassName;
+    private int inputDelayMicroSec = 0;
 
-    private TestParams(String taskClassName,
-                       long inputInstances,
-                       long samplingSize,
-                       long evaluationInstances,
-                       long classifiedInstances,
-                       float classificationsCorrect,
-                       float kappaStat,
-                       float kappaTempStat,
-                       String cliStringTemplate,
-                       int pollTimeoutSeconds,
-                       int prePollWait,
-                       int inputDelayMicroSec) {
-        this.taskClassName = taskClassName;
-        this.inputInstances = inputInstances;
-        this.samplingSize = samplingSize;
-        this.evaluationInstances = evaluationInstances;
-        this.classifiedInstances = classifiedInstances;
-        this.classificationsCorrect = classificationsCorrect;
-        this.kappaStat = kappaStat;
-        this.kappaTempStat = kappaTempStat;
-        this.cliStringTemplate = cliStringTemplate;
-        this.pollTimeoutSeconds = pollTimeoutSeconds;
-        this.prePollWait = prePollWait;
-        this.inputDelayMicroSec = inputDelayMicroSec;
-    }
-
-    public String getTaskClassName() {
-        return taskClassName;
-    }
-
-    public long getInputInstances() {
-        return inputInstances;
+    public Builder taskClassName(String taskClassName) {
+      this.taskClassName = taskClassName;
+      return this;
     }
 
-    public long getSamplingSize() {
-        return samplingSize;
+    public Builder inputInstances(long inputInstances) {
+      this.inputInstances = inputInstances;
+      return this;
     }
 
-    public int getPollTimeoutSeconds() {
-        return pollTimeoutSeconds;
+    public Builder samplingSize(long samplingSize) {
+      this.samplingSize = samplingSize;
+      return this;
     }
 
-    public int getPrePollWaitSeconds() {
-        return prePollWait;
+    public Builder evaluationInstances(long evaluationInstances) {
+      this.evaluationInstances = evaluationInstances;
+      return this;
     }
 
-    public String getCliStringTemplate() {
-        return cliStringTemplate;
+    public Builder classifiedInstances(long classifiedInstances) {
+      this.classifiedInstances = classifiedInstances;
+      return this;
     }
 
-    public long getEvaluationInstances() {
-        return evaluationInstances;
+    public Builder classificationsCorrect(float classificationsCorrect) {
+      this.classificationsCorrect = classificationsCorrect;
+      return this;
     }
 
-    public long getClassifiedInstances() {
-        return classifiedInstances;
+    public Builder kappaStat(float kappaStat) {
+      this.kappaStat = kappaStat;
+      return this;
     }
 
-    public float getClassificationsCorrect() {
-        return classificationsCorrect;
+    public Builder kappaTempStat(float kappaTempStat) {
+      this.kappaTempStat = kappaTempStat;
+      return this;
     }
 
-    public float getKappaStat() {
-        return kappaStat;
+    public Builder cliStringTemplate(String cliStringTemplate) {
+      this.cliStringTemplate = cliStringTemplate;
+      return this;
     }
 
-    public float getKappaTempStat() {
-        return kappaTempStat;
+    public Builder resultFilePollTimeout(int pollTimeoutSeconds) {
+      this.pollTimeoutSeconds = pollTimeoutSeconds;
+      return this;
     }
 
-    public int getInputDelayMicroSec() {
-        return inputDelayMicroSec;
+    public Builder inputDelayMicroSec(int inputDelayMicroSec) {
+      this.inputDelayMicroSec = inputDelayMicroSec;
+      return this;
     }
 
-    @Override
-    public String toString() {
-        return "TestParams{\n" +
-                "inputInstances=" + inputInstances + "\n" +
-                "samplingSize=" + samplingSize + "\n" +
-                "evaluationInstances=" + evaluationInstances + "\n" +
-                "classifiedInstances=" + classifiedInstances + "\n" +
-                "classificationsCorrect=" + classificationsCorrect + "\n" +
-                "kappaStat=" + kappaStat + "\n" +
-                "kappaTempStat=" + kappaTempStat + "\n" +
-                "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" +
-                "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" +
-                "prePollWait=" + prePollWait + "\n" +
-                "taskClassName='" + taskClassName + '\'' + "\n" +
-                "inputDelayMicroSec=" + inputDelayMicroSec + "\n" +
-                '}';
+    public Builder prePollWait(int prePollWaitSeconds) {
+      this.prePollWaitSeconds = prePollWaitSeconds;
+      return this;
     }
 
-    public static class Builder {
-        private long inputInstances;
-        private long samplingSize;
-        private long evaluationInstances;
-        private long classifiedInstances;
-        private float classificationsCorrect;
-        private float kappaStat =0f;
-        private float kappaTempStat =0f;
-        private String cliStringTemplate;
-        private int pollTimeoutSeconds = 10;
-        private int prePollWaitSeconds = 10;
-        private String taskClassName;
-        private int inputDelayMicroSec = 0;
-
-        public Builder taskClassName(String taskClassName) {
-            this.taskClassName = taskClassName;
-            return this;
-        }
-
-        public Builder inputInstances(long inputInstances) {
-            this.inputInstances = inputInstances;
-            return this;
-        }
-
-        public Builder samplingSize(long samplingSize) {
-            this.samplingSize = samplingSize;
-            return this;
-        }
-
-        public Builder evaluationInstances(long evaluationInstances) {
-            this.evaluationInstances = evaluationInstances;
-            return this;
-        }
-
-        public Builder classifiedInstances(long classifiedInstances) {
-            this.classifiedInstances = classifiedInstances;
-            return this;
-        }
-
-        public Builder classificationsCorrect(float classificationsCorrect) {
-            this.classificationsCorrect = classificationsCorrect;
-            return this;
-        }
-
-        public Builder kappaStat(float kappaStat) {
-            this.kappaStat = kappaStat;
-            return this;
-        }
-
-        public Builder kappaTempStat(float kappaTempStat) {
-            this.kappaTempStat = kappaTempStat;
-            return this;
-        }
-
-        public Builder cliStringTemplate(String cliStringTemplate) {
-            this.cliStringTemplate = cliStringTemplate;
-            return this;
-        }
-
-        public Builder resultFilePollTimeout(int pollTimeoutSeconds) {
-            this.pollTimeoutSeconds = pollTimeoutSeconds;
-            return this;
-        }
-
-        public Builder inputDelayMicroSec(int inputDelayMicroSec) {
-            this.inputDelayMicroSec = inputDelayMicroSec;
-            return this;
-        }
-
-        public Builder prePollWait(int prePollWaitSeconds) {
-            this.prePollWaitSeconds = prePollWaitSeconds;
-            return this;
-        }
-
-        public TestParams build() {
-            return new TestParams(taskClassName,
-                    inputInstances,
-                    samplingSize,
-                    evaluationInstances,
-                    classifiedInstances,
-                    classificationsCorrect,
-                    kappaStat,
-                    kappaTempStat,
-                    cliStringTemplate,
-                    pollTimeoutSeconds,
-                    prePollWaitSeconds,
-                    inputDelayMicroSec);
-        }
+    public TestParams build() {
+      return new TestParams(taskClassName,
+          inputInstances,
+          samplingSize,
+          evaluationInstances,
+          classifiedInstances,
+          classificationsCorrect,
+          kappaStat,
+          kappaTempStat,
+          cliStringTemplate,
+          pollTimeoutSeconds,
+          prePollWaitSeconds,
+          inputDelayMicroSec);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
----------------------------------------------------------------------
diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java 
b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
index d66f5df..c36706c 100644
--- a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
+++ b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
@@ -41,113 +41,112 @@ import static org.junit.Assert.assertTrue;
 
 public class TestUtils {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(TestUtils.class.getName());
-
-
-    public static void test(final TestParams testParams) throws IOException, 
ClassNotFoundException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException, InterruptedException {
-
-        final File tempFile = File.createTempFile("test", "test");
-
-        LOG.info("Starting test, output file is {}, test config is \n{}", 
tempFile.getAbsolutePath(), testParams.toString());
-
-        Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
-
-            @Override
-            public Void call() throws Exception {
-                try {
-                    Class.forName(testParams.getTaskClassName())
-                            .getMethod("main", String[].class)
-                            .invoke(null, (Object) String.format(
-                                    testParams.getCliStringTemplate(),
-                                    tempFile.getAbsolutePath(),
-                                    testParams.getInputInstances(),
-                                    testParams.getSamplingSize(),
-                                    testParams.getInputDelayMicroSec()
-                                    ).split("[ ]"));
-                } catch (Exception e) {
-                    LOG.error("Cannot execute test {} {}", e.getMessage(), 
e.getCause().getMessage());
-                }
-                return null;
-            }
-        });
-
-        
Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds()));
-
-        CountDownLatch signalComplete = new CountDownLatch(1);
-
-        final Tailer tailer = Tailer.create(tempFile, new 
TestResultsTailerAdapter(signalComplete), 1000);
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                tailer.run();
-            }
-        }).start();
-
-        signalComplete.await();
-        tailer.stop();
-
-        assertResults(tempFile, testParams);
-    }
-
-    public static void assertResults(File outputFile, 
com.yahoo.labs.samoa.TestParams testParams) throws IOException {
-
-        LOG.info("Checking results file " + outputFile.getAbsolutePath());
-        // 1. parse result file with csv parser
-        Reader in = new FileReader(outputFile);
-        Iterable<CSVRecord> records = 
CSVFormat.EXCEL.withSkipHeaderRecord(false)
-                
.withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in);
-        CSVRecord last = null;
-        Iterator<CSVRecord> iterator = records.iterator();
-        CSVRecord header = iterator.next();
-        Assert.assertEquals("Invalid number of columns", 5, header.size());
-
-        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim());
-        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim());
-        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2).trim());
-        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.KAPPA_STAT, header.get(3).trim());
-        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim());
-
-        // 2. check last line result
-        while (iterator.hasNext()) {
-            last = iterator.next();
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestUtils.class.getName());
+
+  public static void test(final TestParams testParams) throws IOException, 
ClassNotFoundException,
+      NoSuchMethodException, InvocationTargetException, 
IllegalAccessException, InterruptedException {
+
+    final File tempFile = File.createTempFile("test", "test");
+
+    LOG.info("Starting test, output file is {}, test config is \n{}", 
tempFile.getAbsolutePath(), testParams.toString());
+
+    Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
+
+      @Override
+      public Void call() throws Exception {
+        try {
+          Class.forName(testParams.getTaskClassName())
+              .getMethod("main", String[].class)
+              .invoke(null, (Object) String.format(
+                  testParams.getCliStringTemplate(),
+                  tempFile.getAbsolutePath(),
+                  testParams.getInputInstances(),
+                  testParams.getSamplingSize(),
+                  testParams.getInputDelayMicroSec()
+                  ).split("[ ]"));
+        } catch (Exception e) {
+          LOG.error("Cannot execute test {} {}", e.getMessage(), 
e.getCause().getMessage());
         }
-
-        assertTrue(String.format("Unmet threshold expected %d got %f",
-                testParams.getEvaluationInstances(), 
Float.parseFloat(last.get(0))),
-                testParams.getEvaluationInstances() <= 
Float.parseFloat(last.get(0)));
-        assertTrue(String.format("Unmet threshold expected %d got %f", 
testParams.getClassifiedInstances(),
-                Float.parseFloat(last.get(1))),
-                testParams.getClassifiedInstances() <= 
Float.parseFloat(last.get(1)));
-        assertTrue(String.format("Unmet threshold expected %f got %f",
-                testParams.getClassificationsCorrect(), 
Float.parseFloat(last.get(2))),
-                testParams.getClassificationsCorrect() <= 
Float.parseFloat(last.get(2)));
-        assertTrue(String.format("Unmet threshold expected %f got %f",
-                testParams.getKappaStat(), Float.parseFloat(last.get(3))),
-                testParams.getKappaStat() <= Float.parseFloat(last.get(3)));
-        assertTrue(String.format("Unmet threshold expected %f got %f",
-                testParams.getKappaTempStat(), Float.parseFloat(last.get(4))),
-                testParams.getKappaTempStat() <= 
Float.parseFloat(last.get(4)));
-
+        return null;
+      }
+    });
+
+    
Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds()));
+
+    CountDownLatch signalComplete = new CountDownLatch(1);
+
+    final Tailer tailer = Tailer.create(tempFile, new 
TestResultsTailerAdapter(signalComplete), 1000);
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        tailer.run();
+      }
+    }).start();
+
+    signalComplete.await();
+    tailer.stop();
+
+    assertResults(tempFile, testParams);
+  }
+
+  public static void assertResults(File outputFile, 
com.yahoo.labs.samoa.TestParams testParams) throws IOException {
+
+    LOG.info("Checking results file " + outputFile.getAbsolutePath());
+    // 1. parse result file with csv parser
+    Reader in = new FileReader(outputFile);
+    Iterable<CSVRecord> records = CSVFormat.EXCEL.withSkipHeaderRecord(false)
+        
.withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in);
+    CSVRecord last = null;
+    Iterator<CSVRecord> iterator = records.iterator();
+    CSVRecord header = iterator.next();
+    Assert.assertEquals("Invalid number of columns", 5, header.size());
+
+    Assert
+        .assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim());
+    Assert
+        .assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim());
+    Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2)
+        .trim());
+    Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.KAPPA_STAT, header.get(3).trim());
+    Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim());
+
+    // 2. check last line result
+    while (iterator.hasNext()) {
+      last = iterator.next();
     }
 
-
-    private static class TestResultsTailerAdapter extends 
TailerListenerAdapter {
-
-        private final CountDownLatch signalComplete;
-
-        public TestResultsTailerAdapter(CountDownLatch signalComplete) {
-            this.signalComplete = signalComplete;
-        }
-
-        @Override
-        public void handle(String line) {
-            if ("# COMPLETED".equals(line.trim())) {
-                signalComplete.countDown();
-            }
-        }
+    assertTrue(String.format("Unmet threshold expected %d got %f",
+        testParams.getEvaluationInstances(), Float.parseFloat(last.get(0))),
+        testParams.getEvaluationInstances() <= Float.parseFloat(last.get(0)));
+    assertTrue(String.format("Unmet threshold expected %d got %f", 
testParams.getClassifiedInstances(),
+        Float.parseFloat(last.get(1))),
+        testParams.getClassifiedInstances() <= Float.parseFloat(last.get(1)));
+    assertTrue(String.format("Unmet threshold expected %f got %f",
+        testParams.getClassificationsCorrect(), Float.parseFloat(last.get(2))),
+        testParams.getClassificationsCorrect() <= 
Float.parseFloat(last.get(2)));
+    assertTrue(String.format("Unmet threshold expected %f got %f",
+        testParams.getKappaStat(), Float.parseFloat(last.get(3))),
+        testParams.getKappaStat() <= Float.parseFloat(last.get(3)));
+    assertTrue(String.format("Unmet threshold expected %f got %f",
+        testParams.getKappaTempStat(), Float.parseFloat(last.get(4))),
+        testParams.getKappaTempStat() <= Float.parseFloat(last.get(4)));
+
+  }
+
+  private static class TestResultsTailerAdapter extends TailerListenerAdapter {
+
+    private final CountDownLatch signalComplete;
+
+    public TestResultsTailerAdapter(CountDownLatch signalComplete) {
+      this.signalComplete = signalComplete;
     }
 
-
-
+    @Override
+    public void handle(String line) {
+      if ("# COMPLETED".equals(line.trim())) {
+        signalComplete.countDown();
+      }
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
index 21ccf9e..2ac9ec1 100644
--- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
+++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
@@ -13,58 +13,58 @@ import com.yahoo.labs.samoa.topology.impl.ThreadsEngine;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class LocalThreadsDoTask {
-    private static final Logger logger = 
LoggerFactory.getLogger(LocalThreadsDoTask.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalThreadsDoTask.class);
 
-    /**
-     * The main method.
-     * 
-     * @param args
-     *            the arguments
-     */
-    public static void main(String[] args) {
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
 
-        ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
-        
-        // Get number of threads for multithreading mode
-        int numThreads = 1;
-        for (int i=0; i<tmpArgs.size()-1; i++) {
-               if (tmpArgs.get(i).equals("-t")) {
-                       try {
-                               numThreads = Integer.parseInt(tmpArgs.get(i+1));
-                               tmpArgs.remove(i+1);
-                               tmpArgs.remove(i);
-                       } catch (NumberFormatException e) {
-                               System.err.println("Invalid number of 
threads.");
-                               System.err.println(e.getStackTrace());
-                       }
-               }
-        }
-        logger.info("Number of threads:{}", numThreads);
-        
-        args = tmpArgs.toArray(new String[0]);
-
-        StringBuilder cliString = new StringBuilder();
-        for (int i = 0; i < args.length; i++) {
-            cliString.append(" ").append(args[i]);
-        }
-        logger.debug("Command line string = {}", cliString.toString());
-        System.out.println("Command line string = " + cliString.toString());
+    ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
 
-        Task task = null;
+    // Get number of threads for multithreading mode
+    int numThreads = 1;
+    for (int i = 0; i < tmpArgs.size() - 1; i++) {
+      if (tmpArgs.get(i).equals("-t")) {
         try {
-            task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
-            logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
-        } catch (Exception e) {
-            logger.error("Fail to initialize the task", e);
-            System.out.println("Fail to initialize the task" + e);
-            return;
+          numThreads = Integer.parseInt(tmpArgs.get(i + 1));
+          tmpArgs.remove(i + 1);
+          tmpArgs.remove(i);
+        } catch (NumberFormatException e) {
+          System.err.println("Invalid number of threads.");
+          System.err.println(e.getStackTrace());
         }
-        task.setFactory(new ThreadsComponentFactory());
-        task.init();
-        
-        ThreadsEngine.submitTopology(task.getTopology(), numThreads);
+      }
     }
+    logger.info("Number of threads:{}", numThreads);
+
+    args = tmpArgs.toArray(new String[0]);
+
+    StringBuilder cliString = new StringBuilder();
+    for (int i = 0; i < args.length; i++) {
+      cliString.append(" ").append(args[i]);
+    }
+    logger.debug("Command line string = {}", cliString.toString());
+    System.out.println("Command line string = " + cliString.toString());
+
+    Task task = null;
+    try {
+      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
+      logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
+    } catch (Exception e) {
+      logger.error("Fail to initialize the task", e);
+      System.out.println("Fail to initialize the task" + e);
+      return;
+    }
+    task.setFactory(new ThreadsComponentFactory());
+    task.init();
+
+    ThreadsEngine.submitTopology(task.getTopology(), numThreads);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
index ac68da2..91f213b 100644
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
@@ -31,34 +31,35 @@ import com.yahoo.labs.samoa.topology.Topology;
 
 /**
  * ComponentFactory for multithreaded engine
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsComponentFactory implements ComponentFactory {
 
-       @Override
-       public ProcessingItem createPi(Processor processor) {
-               return this.createPi(processor, 1);
-       }
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
 
-       @Override
-       public ProcessingItem createPi(Processor processor, int paralellism) {
-               return new ThreadsProcessingItem(processor, paralellism);
-       }
+  @Override
+  public ProcessingItem createPi(Processor processor, int paralellism) {
+    return new ThreadsProcessingItem(processor, paralellism);
+  }
 
-       @Override
-       public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
-               return new ThreadsEntranceProcessingItem(entranceProcessor);
-       }
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+    return new ThreadsEntranceProcessingItem(entranceProcessor);
+  }
 
-       @Override
-       public Stream createStream(IProcessingItem sourcePi) {
-               return new ThreadsStream(sourcePi);
-       }
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    return new ThreadsStream(sourcePi);
+  }
 
-       @Override
-       public Topology createTopology(String topoName) {
-               return new ThreadsTopology(topoName);
-       }
+  @Override
+  public Topology createTopology(String topoName) {
+    return new ThreadsTopology(topoName);
+  }
 
 }

Reply via email to