http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
deleted file mode 100644
index cba6b28..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
+++ /dev/null
@@ -1,212 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.Map;
-import java.util.UUID;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-
-/**
- * EntranceProcessingItem implementation for Storm.
- */
-class StormEntranceProcessingItem extends AbstractEntranceProcessingItem 
implements StormTopologyNode {
-  private final StormEntranceSpout piSpout;
-
-  StormEntranceProcessingItem(EntranceProcessor processor) {
-    this(processor, UUID.randomUUID().toString());
-  }
-
-  StormEntranceProcessingItem(EntranceProcessor processor, String friendlyId) {
-    super(processor);
-    this.setName(friendlyId);
-    this.piSpout = new StormEntranceSpout(processor);
-  }
-
-  @Override
-  public EntranceProcessingItem setOutputStream(Stream stream) {
-    // piSpout.streams.add(stream);
-    piSpout.setOutputStream((StormStream) stream);
-    return this;
-  }
-
-  @Override
-  public Stream getOutputStream() {
-    return piSpout.getOutputStream();
-  }
-
-  @Override
-  public void addToTopology(StormTopology topology, int parallelismHint) {
-    topology.getStormBuilder().setSpout(this.getName(), piSpout, 
parallelismHint);
-  }
-
-  @Override
-  public StormStream createStream() {
-    return piSpout.createStream(this.getName());
-  }
-
-  @Override
-  public String getId() {
-    return this.getName();
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder(super.toString());
-    sb.insert(0, String.format("id: %s, ", this.getName()));
-    return sb.toString();
-  }
-
-  /**
-   * Resulting Spout of StormEntranceProcessingItem
-   */
-  final static class StormEntranceSpout extends BaseRichSpout {
-
-    private static final long serialVersionUID = -9066409791668954099L;
-
-    // private final Set<StormSpoutStream> streams;
-    private final EntranceProcessor entranceProcessor;
-    private StormStream outputStream;
-
-    // private transient SpoutStarter spoutStarter;
-    // private transient Executor spoutExecutors;
-    // private transient LinkedBlockingQueue<StormTupleInfo> tupleInfoQueue;
-
-    private SpoutOutputCollector collector;
-
-    StormEntranceSpout(EntranceProcessor processor) {
-      // this.streams = new HashSet<StormSpoutStream>();
-      this.entranceProcessor = processor;
-    }
-
-    public StormStream getOutputStream() {
-      return outputStream;
-    }
-
-    public void setOutputStream(StormStream stream) {
-      this.outputStream = stream;
-    }
-
-    @Override
-    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext 
context, SpoutOutputCollector collector) {
-      this.collector = collector;
-      // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>();
-
-      // Processor and this class share the same instance of stream
-      // for (StormSpoutStream stream : streams) {
-      // stream.setSpout(this);
-      // }
-      // outputStream.setSpout(this);
-
-      this.entranceProcessor.onCreate(context.getThisTaskId());
-      // this.spoutStarter = new SpoutStarter(this.starter);
-
-      // this.spoutExecutors = Executors.newSingleThreadExecutor();
-      // this.spoutExecutors.execute(spoutStarter);
-    }
-
-    @Override
-    public void nextTuple() {
-      if (entranceProcessor.hasNext()) {
-        Values value = newValues(entranceProcessor.nextEvent());
-        collector.emit(outputStream.getOutputId(), value);
-      } else
-        Utils.sleep(1000);
-      // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50,
-      // TimeUnit.MILLISECONDS);
-      // if (tupleInfo != null) {
-      // Values value = newValues(tupleInfo.getContentEvent());
-      // collector.emit(tupleInfo.getStormStream().getOutputId(), value);
-      // }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      // for (StormStream stream : streams) {
-      // declarer.declareStream(stream.getOutputId(), new
-      // Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
-      // StormSamoaUtils.KEY_FIELD));
-      // }
-      declarer.declareStream(outputStream.getOutputId(), new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
-          StormSamoaUtils.KEY_FIELD));
-    }
-
-    StormStream createStream(String piId) {
-      // StormSpoutStream stream = new StormSpoutStream(piId);
-      StormStream stream = new StormBoltStream(piId);
-      // streams.add(stream);
-      return stream;
-    }
-
-    // void put(StormSpoutStream stream, ContentEvent contentEvent) {
-    // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent));
-    // }
-
-    private Values newValues(ContentEvent contentEvent) {
-      return new Values(contentEvent, contentEvent.getKey());
-    }
-
-    // private final static class StormTupleInfo {
-    //
-    // private final StormStream stream;
-    // private final ContentEvent event;
-    //
-    // StormTupleInfo(StormStream stream, ContentEvent event) {
-    // this.stream = stream;
-    // this.event = event;
-    // }
-    //
-    // public StormStream getStormStream() {
-    // return this.stream;
-    // }
-    //
-    // public ContentEvent getContentEvent() {
-    // return this.event;
-    // }
-    // }
-
-    // private final static class SpoutStarter implements Runnable {
-    //
-    // private final TopologyStarter topoStarter;
-    //
-    // SpoutStarter(TopologyStarter topoStarter) {
-    // this.topoStarter = topoStarter;
-    // }
-    //
-    // @Override
-    // public void run() {
-    // this.topoStarter.start();
-    // }
-    // }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
deleted file mode 100644
index bd7a666..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Properties;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.utils.Utils;
-
-/**
- * Utility class to submit samoa-storm jar to a Storm cluster.
- * 
- * @author Arinto Murdopo
- * 
- */
-public class StormJarSubmitter {
-
-  public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation";
-
-  /**
-   * @param args
-   * @throws IOException
-   */
-  public static void main(String[] args) throws IOException {
-
-    Config config = new Config();
-    config.putAll(Utils.readCommandLineOpts());
-    config.putAll(Utils.readStormConfig());
-
-    String nimbusHost = (String) config.get(Config.NIMBUS_HOST);
-    int nimbusThriftPort = Utils.getInt(config
-        .get(Config.NIMBUS_THRIFT_PORT));
-
-    System.out.println("Nimbus host " + nimbusHost);
-    System.out.println("Nimbus thrift port " + nimbusThriftPort);
-
-    System.out.println("uploading jar from " + args[0]);
-    String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]);
-
-    System.out.println("Uploaded jar file location: ");
-    System.out.println(uploadedJarLocation);
-
-    Properties props = StormSamoaUtils.getProperties();
-    props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, 
uploadedJarLocation);
-
-    File f = new File("src/main/resources/samoa-storm-cluster.properties");
-    f.createNewFile();
-
-    OutputStream out = new FileOutputStream(f);
-    props.store(out, "properties file to store uploaded jar location from 
StormJarSubmitter");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
deleted file mode 100644
index 509862b..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.AbstractProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.impl.StormStream.InputStreamId;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-/**
- * ProcessingItem implementation for Storm.
- * 
- * @author Arinto Murdopo
- * 
- */
-class StormProcessingItem extends AbstractProcessingItem implements 
StormTopologyNode {
-  private final ProcessingItemBolt piBolt;
-  private BoltDeclarer piBoltDeclarer;
-
-  // TODO: should we put parallelism hint here?
-  // imo, parallelism hint only declared when we add this PI in the topology
-  // open for dicussion :p
-
-  StormProcessingItem(Processor processor, int parallelismHint) {
-    this(processor, UUID.randomUUID().toString(), parallelismHint);
-  }
-
-  StormProcessingItem(Processor processor, String friendlyId, int 
parallelismHint) {
-    super(processor, parallelismHint);
-    this.piBolt = new ProcessingItemBolt(processor);
-    this.setName(friendlyId);
-  }
-
-  @Override
-  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
-    StormStream stormInputStream = (StormStream) inputStream;
-    InputStreamId inputId = stormInputStream.getInputId();
-
-    switch (scheme) {
-    case SHUFFLE:
-      piBoltDeclarer.shuffleGrouping(inputId.getComponentId(), 
inputId.getStreamId());
-      break;
-    case GROUP_BY_KEY:
-      piBoltDeclarer.fieldsGrouping(
-          inputId.getComponentId(),
-          inputId.getStreamId(),
-          new Fields(StormSamoaUtils.KEY_FIELD));
-      break;
-    case BROADCAST:
-      piBoltDeclarer.allGrouping(
-          inputId.getComponentId(),
-          inputId.getStreamId());
-      break;
-    }
-    return this;
-  }
-
-  @Override
-  public void addToTopology(StormTopology topology, int parallelismHint) {
-    if (piBoltDeclarer != null) {
-      // throw exception that one PI only belong to one topology
-    } else {
-      TopologyBuilder stormBuilder = topology.getStormBuilder();
-      this.piBoltDeclarer = stormBuilder.setBolt(this.getName(),
-          this.piBolt, parallelismHint);
-    }
-  }
-
-  @Override
-  public StormStream createStream() {
-    return piBolt.createStream(this.getName());
-  }
-
-  @Override
-  public String getId() {
-    return this.getName();
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder(super.toString());
-    sb.insert(0, String.format("id: %s, ", this.getName()));
-    return sb.toString();
-  }
-
-  private final static class ProcessingItemBolt extends BaseRichBolt {
-
-    private static final long serialVersionUID = -6637673741263199198L;
-
-    private final Set<StormBoltStream> streams;
-    private final Processor processor;
-
-    private OutputCollector collector;
-
-    ProcessingItemBolt(Processor processor) {
-      this.streams = new HashSet<StormBoltStream>();
-      this.processor = processor;
-    }
-
-    @Override
-    public void prepare(@SuppressWarnings("rawtypes") Map stormConf, 
TopologyContext context,
-        OutputCollector collector) {
-      this.collector = collector;
-      // Processor and this class share the same instance of stream
-      for (StormBoltStream stream : streams) {
-        stream.setCollector(this.collector);
-      }
-
-      this.processor.onCreate(context.getThisTaskId());
-    }
-
-    @Override
-    public void execute(Tuple input) {
-      Object sentObject = input.getValue(0);
-      ContentEvent sentEvent = (ContentEvent) sentObject;
-      processor.process(sentEvent);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      for (StormStream stream : streams) {
-        declarer.declareStream(stream.getOutputId(),
-            new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
-                StormSamoaUtils.KEY_FIELD));
-      }
-    }
-
-    StormStream createStream(String piId) {
-      StormBoltStream stream = new StormBoltStream(piId);
-      streams.add(stream);
-      return stream;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
deleted file mode 100644
index bb9e3f4..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.github.javacliparser.ClassOption;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.tasks.Task;
-
-/**
- * Utility class for samoa-storm project. It is used by StormDoTask to process 
its arguments.
- * 
- * @author Arinto Murdopo
- * 
- */
-public class StormSamoaUtils {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(StormSamoaUtils.class);
-
-  static final String KEY_FIELD = "key";
-  static final String CONTENT_EVENT_FIELD = "content_event";
-
-  static Properties getProperties() throws IOException {
-    Properties props = new Properties();
-    InputStream is;
-
-    File f = new File("src/main/resources/samoa-storm-cluster.properties"); // 
FIXME it does not exist anymore
-    is = new FileInputStream(f);
-
-    try {
-      props.load(is);
-    } catch (IOException e1) {
-      System.out.println("Fail to load property file");
-      return null;
-    } finally {
-      is.close();
-    }
-
-    return props;
-  }
-
-  public static StormTopology argsToTopology(String[] args) {
-    StringBuilder cliString = new StringBuilder();
-    for (String arg : args) {
-      cliString.append(" ").append(arg);
-    }
-    logger.debug("Command line string = {}", cliString.toString());
-
-    Task task = getTask(cliString.toString());
-
-    // TODO: remove setFactory method with DynamicBinding
-    task.setFactory(new StormComponentFactory());
-    task.init();
-
-    return (StormTopology) task.getTopology();
-  }
-
-  public static int numWorkers(List<String> tmpArgs) {
-    int position = tmpArgs.size() - 1;
-    int numWorkers;
-
-    try {
-      numWorkers = Integer.parseInt(tmpArgs.get(position));
-      tmpArgs.remove(position);
-    } catch (NumberFormatException e) {
-      numWorkers = 4;
-    }
-
-    return numWorkers;
-  }
-
-  public static Task getTask(String cliString) {
-    Task task = null;
-    try {
-      logger.debug("Providing task [{}]", cliString);
-      task = ClassOption.cliStringToObject(cliString, Task.class, null);
-    } catch (Exception e) {
-      logger.warn("Fail in initializing the task!");
-      e.printStackTrace();
-    }
-    return task;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
deleted file mode 100644
index 0aab940..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
+++ /dev/null
@@ -1,65 +0,0 @@
-//package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-//
-//import com.yahoo.labs.samoa.core.ContentEvent;
-//import 
com.yahoo.labs.samoa.topology.impl.StormEntranceProcessingItem.StormEntranceSpout;
-//
-///**
-// * Storm Stream that connects into Spout. It wraps the spout itself
-// * @author Arinto Murdopo
-// *
-// */
-//final class StormSpoutStream extends StormStream{
-//
-//     /**
-//      * 
-//      */
-//     private static final long serialVersionUID = -7444653177614988650L;
-//     
-//     private StormEntranceSpout spout;
-//     
-//     StormSpoutStream(String stormComponentId) {
-//             super(stormComponentId);
-//     }
-//
-//     @Override
-//     public void put(ContentEvent contentEvent) {
-//             spout.put(this, contentEvent);
-//     }
-//     
-//    void setSpout(StormEntranceSpout spout){
-//             this.spout = spout;
-//     }
-//
-////   @Override
-////   public void setStreamId(String stream) {
-////           // TODO Auto-generated method stub
-////           
-////   }
-//
-//     @Override
-//     public String getStreamId() {
-//             // TODO Auto-generated method stub
-//             return null;
-//     }
-//
-// }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
deleted file mode 100644
index ca1edc9..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.UUID;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.topology.Stream;
-
-/**
- * Abstract class to implement Storm Stream
- * 
- * @author Arinto Murdopo
- * 
- */
-abstract class StormStream implements Stream, java.io.Serializable {
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = 281835563756514852L;
-  protected final String outputStreamId;
-  protected final InputStreamId inputStreamId;
-
-  public StormStream(String stormComponentId) {
-    this.outputStreamId = UUID.randomUUID().toString();
-    this.inputStreamId = new InputStreamId(stormComponentId, 
this.outputStreamId);
-  }
-
-  @Override
-  public abstract void put(ContentEvent contentEvent);
-
-  String getOutputId() {
-    return this.outputStreamId;
-  }
-
-  InputStreamId getInputId() {
-    return this.inputStreamId;
-  }
-
-  final static class InputStreamId implements java.io.Serializable {
-
-    /**
-                * 
-                */
-    private static final long serialVersionUID = -7457995634133691295L;
-    private final String componentId;
-    private final String streamId;
-
-    InputStreamId(String componentId, String streamId) {
-      this.componentId = componentId;
-      this.streamId = streamId;
-    }
-
-    String getComponentId() {
-      return componentId;
-    }
-
-    String getStreamId() {
-      return streamId;
-    }
-  }
-
-  @Override
-  public void setBatchSize(int batchSize) {
-    // Ignore batch size
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
deleted file mode 100644
index 8a76c55..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import backtype.storm.topology.TopologyBuilder;
-
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.AbstractTopology;
-
-/**
- * Adaptation of SAMOA topology in samoa-storm
- * 
- * @author Arinto Murdopo
- * 
- */
-public class StormTopology extends AbstractTopology {
-
-  private TopologyBuilder builder;
-
-  public StormTopology(String topologyName) {
-    super(topologyName);
-    this.builder = new TopologyBuilder();
-  }
-
-  @Override
-  public void addProcessingItem(IProcessingItem procItem, int parallelismHint) 
{
-    StormTopologyNode stormNode = (StormTopologyNode) procItem;
-    stormNode.addToTopology(this, parallelismHint);
-    super.addProcessingItem(procItem, parallelismHint);
-  }
-
-  public TopologyBuilder getStormBuilder() {
-    return builder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
deleted file mode 100644
index 8122258..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * Interface to represent a node in samoa-storm topology.
- * 
- * @author Arinto Murdopo
- * 
- */
-interface StormTopologyNode {
-
-  void addToTopology(StormTopology topology, int parallelismHint);
-
-  StormStream createStream();
-
-  String getId();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
deleted file mode 100644
index bbac521..0000000
--- 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.thrift7.TException;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Helper class to submit SAMOA task into Storm without the need of submitting 
the jar file. The jar file must be
- * submitted first using StormJarSubmitter class.
- * 
- * @author Arinto Murdopo
- * 
- */
-public class StormTopologySubmitter {
-
-  public static String YJP_OPTIONS_KEY = "YjpOptions";
-
-  private static Logger logger = 
LoggerFactory.getLogger(StormTopologySubmitter.class);
-
-  public static void main(String[] args) throws IOException {
-    Properties props = StormSamoaUtils.getProperties();
-
-    String uploadedJarLocation = 
props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
-    if (uploadedJarLocation == null) {
-      logger.error("Invalid properties file. It must have key {}",
-          StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
-      return;
-    }
-
-    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
-    int numWorkers = StormSamoaUtils.numWorkers(tmpArgs);
-
-    args = tmpArgs.toArray(new String[0]);
-    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
-
-    Config conf = new Config();
-    conf.putAll(Utils.readStormConfig());
-    conf.putAll(Utils.readCommandLineOpts());
-    conf.setDebug(false);
-    conf.setNumWorkers(numWorkers);
-
-    String profilerOption =
-        props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY);
-    if (profilerOption != null) {
-      String topoWorkerChildOpts = (String) 
conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
-      StringBuilder optionBuilder = new StringBuilder();
-      if (topoWorkerChildOpts != null) {
-        optionBuilder.append(topoWorkerChildOpts);
-        optionBuilder.append(' ');
-      }
-      optionBuilder.append(profilerOption);
-      conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString());
-    }
-
-    Map<String, Object> myConfigMap = new HashMap<String, Object>(conf);
-    StringWriter out = new StringWriter();
-
-    try {
-      JSONValue.writeJSONString(myConfigMap, out);
-    } catch (IOException e) {
-      System.out.println("Error in writing JSONString");
-      e.printStackTrace();
-      return;
-    }
-
-    Config config = new Config();
-    config.putAll(Utils.readStormConfig());
-
-    NimbusClient nc = NimbusClient.getConfiguredClient(config);
-    String topologyName = stormTopo.getTopologyName();
-    try {
-      System.out.println("Submitting topology with name: "
-          + topologyName);
-      nc.getClient().submitTopology(topologyName, uploadedJarLocation,
-          out.toString(), stormTopo.getStormBuilder().createTopology());
-      System.out.println(topologyName + " is successfully submitted");
-
-    } catch (AlreadyAliveException aae) {
-      System.out.println("Fail to submit " + topologyName
-          + "\nError message: " + aae.get_msg());
-    } catch (InvalidTopologyException ite) {
-      System.out.println("Invalid topology for " + topologyName);
-      ite.printStackTrace();
-    } catch (TException te) {
-      System.out.println("Texception for " + topologyName);
-      te.printStackTrace();
-    }
-  }
-
-  private static String uploadedJarLocation(List<String> tmpArgs) {
-    int position = tmpArgs.size() - 1;
-    String uploadedJarLocation = tmpArgs.get(position);
-    tmpArgs.remove(position);
-    return uploadedJarLocation;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java 
b/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java
new file mode 100644
index 0000000..f2b9c0c
--- /dev/null
+++ b/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java
@@ -0,0 +1,78 @@
+package org.apache.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.samoa.topology.impl.StormSamoaUtils;
+import org.apache.samoa.topology.impl.StormTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
+/**
+ * The main class to execute a SAMOA task in LOCAL mode in Storm.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class LocalStormDoTask {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalStormDoTask.class);
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
+
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+    int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
+
+    args = tmpArgs.toArray(new String[0]);
+
+    // convert the arguments into Storm topology
+    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+    String topologyName = stormTopo.getTopologyName();
+
+    Config conf = new Config();
+    // conf.putAll(Utils.readStormConfig());
+    conf.setDebug(false);
+
+    // local mode
+    conf.setMaxTaskParallelism(numWorker);
+
+    backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster();
+    cluster.submitTopology(topologyName, conf, 
stormTopo.getStormBuilder().createTopology());
+
+    backtype.storm.utils.Utils.sleep(600 * 1000);
+
+    cluster.killTopology(topologyName);
+    cluster.shutdown();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormBoltStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormBoltStream.java 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormBoltStream.java
new file mode 100644
index 0000000..de9ef0a
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormBoltStream.java
@@ -0,0 +1,67 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Values;
+
+/**
+ * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+class StormBoltStream extends StormStream {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -5712513402991550847L;
+
+  private OutputCollector outputCollector;
+
+  StormBoltStream(String stormComponentId) {
+    super(stormComponentId);
+  }
+
+  @Override
+  public void put(ContentEvent contentEvent) {
+    outputCollector.emit(this.outputStreamId, new Values(contentEvent, 
contentEvent.getKey()));
+  }
+
+  public void setCollector(OutputCollector outputCollector) {
+    this.outputCollector = outputCollector;
+  }
+
+  // @Override
+  // public void setStreamId(String streamId) {
+  // // TODO Auto-generated method stub
+  // //this.outputStreamId = streamId;
+  // }
+
+  @Override
+  public String getStreamId() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormComponentFactory.java
 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormComponentFactory.java
new file mode 100644
index 0000000..353cffc
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormComponentFactory.java
@@ -0,0 +1,90 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+/**
+ * Component factory implementation for samoa-storm
+ */
+public final class StormComponentFactory implements ComponentFactory {
+
+  private final Map<String, Integer> processorList;
+
+  public StormComponentFactory() {
+    processorList = new HashMap<>();
+  }
+
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), 1);
+  }
+
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+    return new StormEntranceProcessingItem(processor, 
this.getComponentName(processor.getClass()));
+  }
+
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi;
+    return stormCompatiblePi.createStream();
+  }
+
+  @Override
+  public Topology createTopology(String topoName) {
+    return new StormTopology(topoName);
+  }
+
+  private String getComponentName(Class<? extends Processor> clazz) {
+    StringBuilder componentName = new StringBuilder(clazz.getCanonicalName());
+    String key = componentName.toString();
+    Integer index;
+
+    if (!processorList.containsKey(key)) {
+      index = 1;
+    } else {
+      index = processorList.get(key) + 1;
+    }
+
+    processorList.put(key, index);
+
+    componentName.append('_');
+    componentName.append(index);
+
+    return componentName.toString();
+  }
+
+  @Override
+  public ProcessingItem createPi(Processor processor, int parallelism) {
+    return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), parallelism);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormDoTask.java 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormDoTask.java
new file mode 100644
index 0000000..436db59
--- /dev/null
+++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormDoTask.java
@@ -0,0 +1,118 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
+/**
+ * The main class that used by samoa script to execute SAMOA task.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class StormDoTask {
+  private static final Logger logger = 
LoggerFactory.getLogger(StormDoTask.class);
+  private static String localFlag = "local";
+  private static String clusterFlag = "cluster";
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
+
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+    boolean isLocal = isLocal(tmpArgs);
+    int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
+
+    args = tmpArgs.toArray(new String[0]);
+
+    // convert the arguments into Storm topology
+    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+    String topologyName = stormTopo.getTopologyName();
+
+    Config conf = new Config();
+    conf.putAll(Utils.readStormConfig());
+    conf.setDebug(false);
+
+    if (isLocal) {
+      // local mode
+      conf.setMaxTaskParallelism(numWorker);
+
+      backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster();
+      cluster.submitTopology(topologyName, conf, 
stormTopo.getStormBuilder().createTopology());
+
+      backtype.storm.utils.Utils.sleep(600 * 1000);
+
+      cluster.killTopology(topologyName);
+      cluster.shutdown();
+
+    } else {
+      // cluster mode
+      conf.setNumWorkers(numWorker);
+      try {
+        backtype.storm.StormSubmitter.submitTopology(topologyName, conf,
+            stormTopo.getStormBuilder().createTopology());
+      } catch (backtype.storm.generated.AlreadyAliveException ale) {
+        ale.printStackTrace();
+      } catch (backtype.storm.generated.InvalidTopologyException ite) {
+        ite.printStackTrace();
+      }
+    }
+  }
+
+  private static boolean isLocal(List<String> tmpArgs) {
+    ExecutionMode executionMode = ExecutionMode.UNDETERMINED;
+
+    int position = tmpArgs.size() - 1;
+    String flag = tmpArgs.get(position);
+    boolean isLocal = true;
+
+    if (flag.equals(clusterFlag)) {
+      executionMode = ExecutionMode.CLUSTER;
+      isLocal = false;
+    } else if (flag.equals(localFlag)) {
+      executionMode = ExecutionMode.LOCAL;
+      isLocal = true;
+    }
+
+    if (executionMode != ExecutionMode.UNDETERMINED) {
+      tmpArgs.remove(position);
+    }
+
+    return isLocal;
+  }
+
+  private enum ExecutionMode {
+    LOCAL, CLUSTER, UNDETERMINED
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormEntranceProcessingItem.java
 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormEntranceProcessingItem.java
new file mode 100644
index 0000000..1183fac
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormEntranceProcessingItem.java
@@ -0,0 +1,212 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.AbstractEntranceProcessingItem;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.Stream;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+/**
+ * EntranceProcessingItem implementation for Storm.
+ */
+class StormEntranceProcessingItem extends AbstractEntranceProcessingItem 
implements StormTopologyNode {
+  private final StormEntranceSpout piSpout;
+
+  StormEntranceProcessingItem(EntranceProcessor processor) {
+    this(processor, UUID.randomUUID().toString());
+  }
+
+  StormEntranceProcessingItem(EntranceProcessor processor, String friendlyId) {
+    super(processor);
+    this.setName(friendlyId);
+    this.piSpout = new StormEntranceSpout(processor);
+  }
+
+  @Override
+  public EntranceProcessingItem setOutputStream(Stream stream) {
+    // piSpout.streams.add(stream);
+    piSpout.setOutputStream((StormStream) stream);
+    return this;
+  }
+
+  @Override
+  public Stream getOutputStream() {
+    return piSpout.getOutputStream();
+  }
+
+  @Override
+  public void addToTopology(StormTopology topology, int parallelismHint) {
+    topology.getStormBuilder().setSpout(this.getName(), piSpout, 
parallelismHint);
+  }
+
+  @Override
+  public StormStream createStream() {
+    return piSpout.createStream(this.getName());
+  }
+
+  @Override
+  public String getId() {
+    return this.getName();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.insert(0, String.format("id: %s, ", this.getName()));
+    return sb.toString();
+  }
+
+  /**
+   * Resulting Spout of StormEntranceProcessingItem
+   */
+  final static class StormEntranceSpout extends BaseRichSpout {
+
+    private static final long serialVersionUID = -9066409791668954099L;
+
+    // private final Set<StormSpoutStream> streams;
+    private final EntranceProcessor entranceProcessor;
+    private StormStream outputStream;
+
+    // private transient SpoutStarter spoutStarter;
+    // private transient Executor spoutExecutors;
+    // private transient LinkedBlockingQueue<StormTupleInfo> tupleInfoQueue;
+
+    private SpoutOutputCollector collector;
+
+    StormEntranceSpout(EntranceProcessor processor) {
+      // this.streams = new HashSet<StormSpoutStream>();
+      this.entranceProcessor = processor;
+    }
+
+    public StormStream getOutputStream() {
+      return outputStream;
+    }
+
+    public void setOutputStream(StormStream stream) {
+      this.outputStream = stream;
+    }
+
+    @Override
+    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext 
context, SpoutOutputCollector collector) {
+      this.collector = collector;
+      // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>();
+
+      // Processor and this class share the same instance of stream
+      // for (StormSpoutStream stream : streams) {
+      // stream.setSpout(this);
+      // }
+      // outputStream.setSpout(this);
+
+      this.entranceProcessor.onCreate(context.getThisTaskId());
+      // this.spoutStarter = new SpoutStarter(this.starter);
+
+      // this.spoutExecutors = Executors.newSingleThreadExecutor();
+      // this.spoutExecutors.execute(spoutStarter);
+    }
+
+    @Override
+    public void nextTuple() {
+      if (entranceProcessor.hasNext()) {
+        Values value = newValues(entranceProcessor.nextEvent());
+        collector.emit(outputStream.getOutputId(), value);
+      } else
+        Utils.sleep(1000);
+      // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50,
+      // TimeUnit.MILLISECONDS);
+      // if (tupleInfo != null) {
+      // Values value = newValues(tupleInfo.getContentEvent());
+      // collector.emit(tupleInfo.getStormStream().getOutputId(), value);
+      // }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      // for (StormStream stream : streams) {
+      // declarer.declareStream(stream.getOutputId(), new
+      // Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
+      // StormSamoaUtils.KEY_FIELD));
+      // }
+      declarer.declareStream(outputStream.getOutputId(), new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
+          StormSamoaUtils.KEY_FIELD));
+    }
+
+    StormStream createStream(String piId) {
+      // StormSpoutStream stream = new StormSpoutStream(piId);
+      StormStream stream = new StormBoltStream(piId);
+      // streams.add(stream);
+      return stream;
+    }
+
+    // void put(StormSpoutStream stream, ContentEvent contentEvent) {
+    // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent));
+    // }
+
+    private Values newValues(ContentEvent contentEvent) {
+      return new Values(contentEvent, contentEvent.getKey());
+    }
+
+    // private final static class StormTupleInfo {
+    //
+    // private final StormStream stream;
+    // private final ContentEvent event;
+    //
+    // StormTupleInfo(StormStream stream, ContentEvent event) {
+    // this.stream = stream;
+    // this.event = event;
+    // }
+    //
+    // public StormStream getStormStream() {
+    // return this.stream;
+    // }
+    //
+    // public ContentEvent getContentEvent() {
+    // return this.event;
+    // }
+    // }
+
+    // private final static class SpoutStarter implements Runnable {
+    //
+    // private final TopologyStarter topoStarter;
+    //
+    // SpoutStarter(TopologyStarter topoStarter) {
+    // this.topoStarter = topoStarter;
+    // }
+    //
+    // @Override
+    // public void run() {
+    // this.topoStarter.start();
+    // }
+    // }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormJarSubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormJarSubmitter.java
 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormJarSubmitter.java
new file mode 100644
index 0000000..a3de798
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormJarSubmitter.java
@@ -0,0 +1,75 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Properties;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.utils.Utils;
+
+/**
+ * Utility class to submit samoa-storm jar to a Storm cluster.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class StormJarSubmitter {
+
+  public final static String UPLOADED_JAR_LOCATION_KEY = "UploadedJarLocation";
+
+  /**
+   * @param args
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+
+    Config config = new Config();
+    config.putAll(Utils.readCommandLineOpts());
+    config.putAll(Utils.readStormConfig());
+
+    String nimbusHost = (String) config.get(Config.NIMBUS_HOST);
+    int nimbusThriftPort = Utils.getInt(config
+        .get(Config.NIMBUS_THRIFT_PORT));
+
+    System.out.println("Nimbus host " + nimbusHost);
+    System.out.println("Nimbus thrift port " + nimbusThriftPort);
+
+    System.out.println("uploading jar from " + args[0]);
+    String uploadedJarLocation = StormSubmitter.submitJar(config, args[0]);
+
+    System.out.println("Uploaded jar file location: ");
+    System.out.println(uploadedJarLocation);
+
+    Properties props = StormSamoaUtils.getProperties();
+    props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, 
uploadedJarLocation);
+
+    File f = new File("src/main/resources/samoa-storm-cluster.properties");
+    f.createNewFile();
+
+    OutputStream out = new FileOutputStream(f);
+    props.store(out, "properties file to store uploaded jar location from 
StormJarSubmitter");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormProcessingItem.java
 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormProcessingItem.java
new file mode 100644
index 0000000..28c3746
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormProcessingItem.java
@@ -0,0 +1,169 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.impl.StormStream.InputStreamId;
+import org.apache.samoa.utils.PartitioningScheme;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * ProcessingItem implementation for Storm.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+class StormProcessingItem extends AbstractProcessingItem implements 
StormTopologyNode {
+  private final ProcessingItemBolt piBolt;
+  private BoltDeclarer piBoltDeclarer;
+
+  // TODO: should we put parallelism hint here?
+  // imo, parallelism hint only declared when we add this PI in the topology
+  // open for dicussion :p
+
+  StormProcessingItem(Processor processor, int parallelismHint) {
+    this(processor, UUID.randomUUID().toString(), parallelismHint);
+  }
+
+  StormProcessingItem(Processor processor, String friendlyId, int 
parallelismHint) {
+    super(processor, parallelismHint);
+    this.piBolt = new ProcessingItemBolt(processor);
+    this.setName(friendlyId);
+  }
+
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    StormStream stormInputStream = (StormStream) inputStream;
+    InputStreamId inputId = stormInputStream.getInputId();
+
+    switch (scheme) {
+    case SHUFFLE:
+      piBoltDeclarer.shuffleGrouping(inputId.getComponentId(), 
inputId.getStreamId());
+      break;
+    case GROUP_BY_KEY:
+      piBoltDeclarer.fieldsGrouping(
+          inputId.getComponentId(),
+          inputId.getStreamId(),
+          new Fields(StormSamoaUtils.KEY_FIELD));
+      break;
+    case BROADCAST:
+      piBoltDeclarer.allGrouping(
+          inputId.getComponentId(),
+          inputId.getStreamId());
+      break;
+    }
+    return this;
+  }
+
+  @Override
+  public void addToTopology(StormTopology topology, int parallelismHint) {
+    if (piBoltDeclarer != null) {
+      // throw exception that one PI only belong to one topology
+    } else {
+      TopologyBuilder stormBuilder = topology.getStormBuilder();
+      this.piBoltDeclarer = stormBuilder.setBolt(this.getName(),
+          this.piBolt, parallelismHint);
+    }
+  }
+
+  @Override
+  public StormStream createStream() {
+    return piBolt.createStream(this.getName());
+  }
+
+  @Override
+  public String getId() {
+    return this.getName();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.insert(0, String.format("id: %s, ", this.getName()));
+    return sb.toString();
+  }
+
+  private final static class ProcessingItemBolt extends BaseRichBolt {
+
+    private static final long serialVersionUID = -6637673741263199198L;
+
+    private final Set<StormBoltStream> streams;
+    private final Processor processor;
+
+    private OutputCollector collector;
+
+    ProcessingItemBolt(Processor processor) {
+      this.streams = new HashSet<StormBoltStream>();
+      this.processor = processor;
+    }
+
+    @Override
+    public void prepare(@SuppressWarnings("rawtypes") Map stormConf, 
TopologyContext context,
+        OutputCollector collector) {
+      this.collector = collector;
+      // Processor and this class share the same instance of stream
+      for (StormBoltStream stream : streams) {
+        stream.setCollector(this.collector);
+      }
+
+      this.processor.onCreate(context.getThisTaskId());
+    }
+
+    @Override
+    public void execute(Tuple input) {
+      Object sentObject = input.getValue(0);
+      ContentEvent sentEvent = (ContentEvent) sentObject;
+      processor.process(sentEvent);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      for (StormStream stream : streams) {
+        declarer.declareStream(stream.getOutputId(),
+            new Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
+                StormSamoaUtils.KEY_FIELD));
+      }
+    }
+
+    StormStream createStream(String piId) {
+      StormBoltStream stream = new StormBoltStream(piId);
+      streams.add(stream);
+      return stream;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java
new file mode 100644
index 0000000..86a5578
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java
@@ -0,0 +1,109 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.ClassOption;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.samoa.tasks.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for samoa-storm project. It is used by StormDoTask to process 
its arguments.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class StormSamoaUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(StormSamoaUtils.class);
+
+  static final String KEY_FIELD = "key";
+  static final String CONTENT_EVENT_FIELD = "content_event";
+
+  static Properties getProperties() throws IOException {
+    Properties props = new Properties();
+    InputStream is;
+
+    File f = new File("src/main/resources/samoa-storm-cluster.properties"); // 
FIXME it does not exist anymore
+    is = new FileInputStream(f);
+
+    try {
+      props.load(is);
+    } catch (IOException e1) {
+      System.out.println("Fail to load property file");
+      return null;
+    } finally {
+      is.close();
+    }
+
+    return props;
+  }
+
+  public static StormTopology argsToTopology(String[] args) {
+    StringBuilder cliString = new StringBuilder();
+    for (String arg : args) {
+      cliString.append(" ").append(arg);
+    }
+    logger.debug("Command line string = {}", cliString.toString());
+
+    Task task = getTask(cliString.toString());
+
+    // TODO: remove setFactory method with DynamicBinding
+    task.setFactory(new StormComponentFactory());
+    task.init();
+
+    return (StormTopology) task.getTopology();
+  }
+
+  public static int numWorkers(List<String> tmpArgs) {
+    int position = tmpArgs.size() - 1;
+    int numWorkers;
+
+    try {
+      numWorkers = Integer.parseInt(tmpArgs.get(position));
+      tmpArgs.remove(position);
+    } catch (NumberFormatException e) {
+      numWorkers = 4;
+    }
+
+    return numWorkers;
+  }
+
+  public static Task getTask(String cliString) {
+    Task task = null;
+    try {
+      logger.debug("Providing task [{}]", cliString);
+      task = ClassOption.cliStringToObject(cliString, Task.class, null);
+    } catch (Exception e) {
+      logger.warn("Fail in initializing the task!");
+      e.printStackTrace();
+    }
+    return task;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSpoutStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSpoutStream.java
 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSpoutStream.java
new file mode 100644
index 0000000..14fc424
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSpoutStream.java
@@ -0,0 +1,66 @@
+package org.apache.samoa.topology.impl;
+//package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+//
+//import org.apache.samoa.core.ContentEvent;
+//import 
org.apache.samoa.topology.impl.StormEntranceProcessingItem.StormEntranceSpout;
+//
+///**
+// * Storm Stream that connects into Spout. It wraps the spout itself
+// * @author Arinto Murdopo
+// *
+// */
+//final class StormSpoutStream extends StormStream{
+//
+//     /**
+//      * 
+//      */
+//     private static final long serialVersionUID = -7444653177614988650L;
+//     
+//     private StormEntranceSpout spout;
+//     
+//     StormSpoutStream(String stormComponentId) {
+//             super(stormComponentId);
+//     }
+//
+//     @Override
+//     public void put(ContentEvent contentEvent) {
+//             spout.put(this, contentEvent);
+//     }
+//     
+//    void setSpout(StormEntranceSpout spout){
+//             this.spout = spout;
+//     }
+//
+////   @Override
+////   public void setStreamId(String stream) {
+////           // TODO Auto-generated method stub
+////           
+////   }
+//
+//     @Override
+//     public String getStreamId() {
+//             // TODO Auto-generated method stub
+//             return null;
+//     }
+//
+// }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormStream.java 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormStream.java
new file mode 100644
index 0000000..1c62389
--- /dev/null
+++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormStream.java
@@ -0,0 +1,86 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.UUID;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.Stream;
+
+/**
+ * Abstract class to implement Storm Stream
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+abstract class StormStream implements Stream, java.io.Serializable {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = 281835563756514852L;
+  protected final String outputStreamId;
+  protected final InputStreamId inputStreamId;
+
+  public StormStream(String stormComponentId) {
+    this.outputStreamId = UUID.randomUUID().toString();
+    this.inputStreamId = new InputStreamId(stormComponentId, 
this.outputStreamId);
+  }
+
+  @Override
+  public abstract void put(ContentEvent contentEvent);
+
+  String getOutputId() {
+    return this.outputStreamId;
+  }
+
+  InputStreamId getInputId() {
+    return this.inputStreamId;
+  }
+
+  final static class InputStreamId implements java.io.Serializable {
+
+    /**
+                * 
+                */
+    private static final long serialVersionUID = -7457995634133691295L;
+    private final String componentId;
+    private final String streamId;
+
+    InputStreamId(String componentId, String streamId) {
+      this.componentId = componentId;
+      this.streamId = streamId;
+    }
+
+    String getComponentId() {
+      return componentId;
+    }
+
+    String getStreamId() {
+      return streamId;
+    }
+  }
+
+  @Override
+  public void setBatchSize(int batchSize) {
+    // Ignore batch size
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopology.java 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopology.java
new file mode 100644
index 0000000..51bb909
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopology.java
@@ -0,0 +1,53 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.AbstractTopology;
+import org.apache.samoa.topology.IProcessingItem;
+
+import backtype.storm.topology.TopologyBuilder;
+
+/**
+ * Adaptation of SAMOA topology in samoa-storm
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class StormTopology extends AbstractTopology {
+
+  private TopologyBuilder builder;
+
+  public StormTopology(String topologyName) {
+    super(topologyName);
+    this.builder = new TopologyBuilder();
+  }
+
+  @Override
+  public void addProcessingItem(IProcessingItem procItem, int parallelismHint) 
{
+    StormTopologyNode stormNode = (StormTopologyNode) procItem;
+    stormNode.addToTopology(this, parallelismHint);
+    super.addProcessingItem(procItem, parallelismHint);
+  }
+
+  public TopologyBuilder getStormBuilder() {
+    return builder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologyNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologyNode.java
 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologyNode.java
new file mode 100644
index 0000000..994fb9f
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologyNode.java
@@ -0,0 +1,37 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Interface to represent a node in samoa-storm topology.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+interface StormTopologyNode {
+
+  void addToTopology(StormTopology topology, int parallelismHint);
+
+  StormStream createStream();
+
+  String getId();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java
 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java
new file mode 100644
index 0000000..25985df
--- /dev/null
+++ 
b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormTopologySubmitter.java
@@ -0,0 +1,132 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.thrift7.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Helper class to submit SAMOA task into Storm without the need of submitting 
the jar file. The jar file must be
+ * submitted first using StormJarSubmitter class.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class StormTopologySubmitter {
+
+  public static String YJP_OPTIONS_KEY = "YjpOptions";
+
+  private static Logger logger = 
LoggerFactory.getLogger(StormTopologySubmitter.class);
+
+  public static void main(String[] args) throws IOException {
+    Properties props = StormSamoaUtils.getProperties();
+
+    String uploadedJarLocation = 
props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
+    if (uploadedJarLocation == null) {
+      logger.error("Invalid properties file. It must have key {}",
+          StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
+      return;
+    }
+
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+    int numWorkers = StormSamoaUtils.numWorkers(tmpArgs);
+
+    args = tmpArgs.toArray(new String[0]);
+    StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+
+    Config conf = new Config();
+    conf.putAll(Utils.readStormConfig());
+    conf.putAll(Utils.readCommandLineOpts());
+    conf.setDebug(false);
+    conf.setNumWorkers(numWorkers);
+
+    String profilerOption =
+        props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY);
+    if (profilerOption != null) {
+      String topoWorkerChildOpts = (String) 
conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
+      StringBuilder optionBuilder = new StringBuilder();
+      if (topoWorkerChildOpts != null) {
+        optionBuilder.append(topoWorkerChildOpts);
+        optionBuilder.append(' ');
+      }
+      optionBuilder.append(profilerOption);
+      conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, optionBuilder.toString());
+    }
+
+    Map<String, Object> myConfigMap = new HashMap<String, Object>(conf);
+    StringWriter out = new StringWriter();
+
+    try {
+      JSONValue.writeJSONString(myConfigMap, out);
+    } catch (IOException e) {
+      System.out.println("Error in writing JSONString");
+      e.printStackTrace();
+      return;
+    }
+
+    Config config = new Config();
+    config.putAll(Utils.readStormConfig());
+
+    NimbusClient nc = NimbusClient.getConfiguredClient(config);
+    String topologyName = stormTopo.getTopologyName();
+    try {
+      System.out.println("Submitting topology with name: "
+          + topologyName);
+      nc.getClient().submitTopology(topologyName, uploadedJarLocation,
+          out.toString(), stormTopo.getStormBuilder().createTopology());
+      System.out.println(topologyName + " is successfully submitted");
+
+    } catch (AlreadyAliveException aae) {
+      System.out.println("Fail to submit " + topologyName
+          + "\nError message: " + aae.get_msg());
+    } catch (InvalidTopologyException ite) {
+      System.out.println("Invalid topology for " + topologyName);
+      ite.printStackTrace();
+    } catch (TException te) {
+      System.out.println("Texception for " + topologyName);
+      te.printStackTrace();
+    }
+  }
+
+  private static String uploadedJarLocation(List<String> tmpArgs) {
+    int position = tmpArgs.size() - 1;
+    String uploadedJarLocation = tmpArgs.get(position);
+    tmpArgs.remove(position);
+    return uploadedJarLocation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java 
b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
deleted file mode 100644
index f767fa8..0000000
--- a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.yahoo.labs.samoa;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.junit.Test;
-
-public class AlgosTest {
-
-  @Test(timeout = 60000)
-  public void testVHTWithStorm() throws Exception {
-
-    TestParams vhtConfig = new TestParams.Builder()
-        .inputInstances(200_000)
-        .samplingSize(20_000)
-        .evaluationInstances(200_000)
-        .classifiedInstances(200_000)
-        .classificationsCorrect(55f)
-        .kappaStat(0f)
-        .kappaTempStat(0f)
-        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
-        .resultFilePollTimeout(30)
-        .prePollWait(15)
-        .taskClassName(LocalStormDoTask.class.getName())
-        .build();
-    TestUtils.test(vhtConfig);
-
-  }
-
-  @Test(timeout = 120000)
-  public void testBaggingWithStorm() throws Exception {
-    TestParams baggingConfig = new TestParams.Builder()
-        .inputInstances(200_000)
-        .samplingSize(20_000)
-        .evaluationInstances(180_000)
-        .classifiedInstances(190_000)
-        .classificationsCorrect(60f)
-        .kappaStat(0f)
-        .kappaTempStat(0f)
-        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
-        .resultFilePollTimeout(40)
-        .prePollWait(20)
-        .taskClassName(LocalStormDoTask.class.getName())
-        .build();
-    TestUtils.test(baggingConfig);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
 
b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
deleted file mode 100644
index 27696bd..0000000
--- 
a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import static org.junit.Assert.assertEquals;
-import mockit.Expectations;
-import mockit.MockUp;
-import mockit.Mocked;
-import mockit.Tested;
-import mockit.Verifications;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.TopologyBuilder;
-
-import com.yahoo.labs.samoa.core.Processor;
-
-public class StormProcessingItemTest {
-  private static final int PARRALLELISM_HINT_2 = 2;
-  private static final int PARRALLELISM_HINT_4 = 4;
-  private static final String ID = "id";
-  @Tested
-  private StormProcessingItem pi;
-  @Mocked
-  private Processor processor;
-  @Mocked
-  private StormTopology topology;
-  @Mocked
-  private TopologyBuilder stormBuilder = new TopologyBuilder();
-
-  @Before
-  public void setUp() {
-    pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2);
-  }
-
-  @Test
-  public void testAddToTopology() {
-    new Expectations() {
-      {
-        topology.getStormBuilder();
-        result = stormBuilder;
-
-        stormBuilder.setBolt(ID, (IRichBolt) any, anyInt);
-        result = new MockUp<BoltDeclarer>() {
-        }.getMockInstance();
-      }
-    };
-
-    pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint 
is ignored
-
-    new Verifications() {
-      {
-        assertEquals(pi.getProcessor(), processor);
-        // TODO add methods to explore a topology and verify them
-        assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2);
-        assertEquals(pi.getId(), ID);
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/test/java/org/apache/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-storm/src/test/java/org/apache/samoa/AlgosTest.java 
b/samoa-storm/src/test/java/org/apache/samoa/AlgosTest.java
new file mode 100644
index 0000000..2208be9
--- /dev/null
+++ b/samoa-storm/src/test/java/org/apache/samoa/AlgosTest.java
@@ -0,0 +1,69 @@
+package org.apache.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.LocalStormDoTask;
+import org.apache.samoa.TestParams;
+import org.apache.samoa.TestUtils;
+import org.junit.Test;
+
+public class AlgosTest {
+
+  @Test(timeout = 60000)
+  public void testVHTWithStorm() throws Exception {
+
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(55f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
+        .resultFilePollTimeout(30)
+        .prePollWait(15)
+        .taskClassName(LocalStormDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
+
+  }
+
+  @Test(timeout = 120000)
+  public void testBaggingWithStorm() throws Exception {
+    TestParams baggingConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(180_000)
+        .classifiedInstances(190_000)
+        .classificationsCorrect(60f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
+        .resultFilePollTimeout(40)
+        .prePollWait(20)
+        .taskClassName(LocalStormDoTask.class.getName())
+        .build();
+    TestUtils.test(baggingConfig);
+
+  }
+
+}


Reply via email to