http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
new file mode 100644
index 0000000..be13673
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingNode.java
@@ -0,0 +1,56 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+
+/**
+ * Common interface of SamzaEntranceProcessingItem and
+ * SamzaProcessingItem
+ * 
+ * @author Anh Thu Vu
+ */
+public interface SamzaProcessingNode extends IProcessingItem {
+       /**
+        * Registers an output stream with this processing item
+        * 
+        * @param stream
+        *               the output stream
+        * @return the number of output streams of this processing item
+        */
+       public int addOutputStream(SamzaStream stream);
+       
+       /**
+        * Gets the name/id of this processing item
+        * 
+        * @return the name/id of this processing item
+        */
+       // TODO: include getName() and setName() in IProcessingItem and/or 
AbstractEPI/PI
+       public String getName();
+       
+       /**
+        * Sets the name/id for this processing item
+        * @param name
+        *            the name/id of this processing item
+        */
+       // TODO: include getName() and setName() in IProcessingItem and/or 
AbstractEPI/PI
+       public void setName(String name);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
new file mode 100644
index 0000000..c1bf5a2
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaStream.java
@@ -0,0 +1,248 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.AbstractStream;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * Stream for SAMOA on Samza
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaStream extends AbstractStream implements Serializable  {
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = 1L;
+
+       private static final String DEFAULT_SYSTEM_NAME = "kafka";
+       
+       private List<SamzaSystemStream> systemStreams;
+       private transient MessageCollector collector;
+       private String systemName;
+       
+       /*
+        * Constructor
+        */
+       public SamzaStream(IProcessingItem sourcePi) {
+               super(sourcePi);
+               this.systemName = DEFAULT_SYSTEM_NAME;
+               // Get name/id for this stream
+               SamzaProcessingNode samzaPi = (SamzaProcessingNode) sourcePi;
+               int index = samzaPi.addOutputStream(this);
+               this.setStreamId(samzaPi.getName()+"-"+Integer.toString(index));
+               // init list of SamzaSystemStream
+               systemStreams = new ArrayList<SamzaSystemStream>();
+       }
+       
+       /*
+        * System name (Kafka)
+        */
+       public void setSystemName(String systemName) {
+               this.systemName = systemName;
+               for (SamzaSystemStream systemStream:systemStreams) {
+                       systemStream.setSystem(systemName);
+               }
+       }
+
+       public String getSystemName() {
+               return this.systemName;
+       }
+
+       /*
+        * Add the PI to the list of destinations. 
+        * Return the name of the corresponding SystemStream.
+        */
+       public SamzaSystemStream addDestination(StreamDestination destination) {
+               PartitioningScheme scheme = destination.getPartitioningScheme();
+               int parallelism = destination.getParallelism();
+               
+               SamzaSystemStream resultStream = null;
+               for (int i=0; i<systemStreams.size(); i++) {
+                       // There is an existing SystemStream that matches the 
settings. 
+                       // Do not create a new one
+                       if (systemStreams.get(i).isSame(scheme, parallelism)) {
+                               resultStream = systemStreams.get(i);
+                       }
+               }
+               
+               // No existing SystemStream match the requirement
+               // Create a new one 
+               if (resultStream == null) {
+                       String topicName = this.getStreamId() + "-" + 
Integer.toString(systemStreams.size());
+                       resultStream = new 
SamzaSystemStream(this.systemName,topicName,scheme,parallelism);
+                       systemStreams.add(resultStream);
+               }
+               
+               return resultStream;
+       }
+       
+       public void setCollector(MessageCollector collector) {
+               this.collector = collector;
+       }
+       
+       public MessageCollector getCollector(){
+               return this.collector;
+       }
+       
+       public void onCreate() {
+               for (SamzaSystemStream stream:systemStreams) {
+                       stream.initSystemStream();
+               }
+       }
+       
+       /*
+        * Implement Stream interface
+        */
+       @Override
+       public void put(ContentEvent event) {
+               for (SamzaSystemStream stream:systemStreams) {
+                       stream.send(collector,event);
+               }
+       }
+       
+       public List<SamzaSystemStream> getSystemStreams() {
+               return this.systemStreams;
+       }
+       
+       /**
+        * SamzaSystemStream wrap around a Samza's SystemStream 
+        * It contains the info to create a Samza stream during the
+        * constructing process of the topology and
+        * will create the actual Samza stream when the topology is submitted
+        * (invoking initSystemStream())
+        * 
+        * @author Anh Thu Vu
+        */
+       public static class SamzaSystemStream implements Serializable {
+               /**
+                * 
+                */
+               private static final long serialVersionUID = 1L;
+               private String system;
+               private String stream;
+               private PartitioningScheme scheme;
+               private int parallelism;
+               
+               private transient SystemStream actualSystemStream = null;
+               
+               /*
+                * Constructors
+                */
+               public SamzaSystemStream(String system, String stream, 
PartitioningScheme scheme, int parallelism) {
+                       this.system = system;
+                       this.stream = stream;
+                       this.scheme = scheme;
+                       this.parallelism = parallelism;
+               }
+               
+               public SamzaSystemStream(String system, String stream, 
PartitioningScheme scheme) {
+                       this(system, stream, scheme, 1);
+               }
+               
+               /*
+                * Setters
+                */
+               public void setSystem(String system) {
+                       this.system = system;
+               }
+               
+               /*
+                * Getters
+                */
+               public String getSystem() {
+                       return this.system;
+               }
+               
+               public String getStream() {
+                       return this.stream;
+               }
+               
+               public PartitioningScheme getPartitioningScheme() {
+                       return this.scheme;
+               }
+               
+               public int getParallelism() {
+                       return this.parallelism;
+               }
+
+               public boolean isSame(PartitioningScheme scheme, int 
parallelismHint) {
+                       return (this.scheme == scheme && this.parallelism == 
parallelismHint);
+               }
+               
+               /*
+                * Init the actual Samza stream
+                */
+               public void initSystemStream() {
+                       actualSystemStream = new SystemStream(this.system, 
this.stream);
+               }
+               
+               /*
+                * Send a ContentEvent
+                */
+               public void send(MessageCollector collector, ContentEvent 
contentEvent) {
+                       if (actualSystemStream == null) 
+                               this.initSystemStream();
+                       
+                       switch(this.scheme) {
+                       case SHUFFLE:
+                               this.sendShuffle(collector, contentEvent);
+                               break;
+                       case GROUP_BY_KEY:
+                               this.sendGroupByKey(collector, contentEvent);
+                               break;
+                       case BROADCAST:
+                               this.sendBroadcast(collector, contentEvent);
+                               break;
+                       }
+               }
+               
+               /*
+                * Helpers
+                */
+               private synchronized void sendShuffle(MessageCollector 
collector, ContentEvent event) {
+                       collector.send(new 
OutgoingMessageEnvelope(this.actualSystemStream, event));
+               }
+               
+               private void sendGroupByKey(MessageCollector collector, 
ContentEvent event) {
+                       collector.send(new 
OutgoingMessageEnvelope(this.actualSystemStream, event.getKey(), null, event));
+               }
+
+               private synchronized void sendBroadcast(MessageCollector 
collector, ContentEvent event) {
+                       for (int i=0; i<parallelism; i++) {
+                               collector.send(new 
OutgoingMessageEnvelope(this.actualSystemStream, i, null, event));
+                       }
+               }       
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
new file mode 100644
index 0000000..a169bc2
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaTopology.java
@@ -0,0 +1,64 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.AbstractTopology;
+
+/**
+ * Topology for Samza
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaTopology extends AbstractTopology {
+       private int procItemCounter;
+       
+       public SamzaTopology(String topoName) {
+               super(topoName);
+               procItemCounter = 0;
+       }
+       
+       @Override
+       public void addProcessingItem(IProcessingItem procItem, int 
parallelism) {
+               super.addProcessingItem(procItem, parallelism);
+               SamzaProcessingNode samzaPi = (SamzaProcessingNode) procItem;
+               
samzaPi.setName(this.getTopologyName()+"-"+Integer.toString(procItemCounter));
+               procItemCounter++;
+       }
+       
+       /*
+        * Gets the set of ProcessingItems, excluding EntrancePIs
+        * Used by SamzaConfigFactory as the config for EntrancePIs and
+        * normal PIs are different
+        */
+       public Set<IProcessingItem> getNonEntranceProcessingItems() throws 
Exception {
+               Set<IProcessingItem> copiedSet = new HashSet<IProcessingItem>();
+               copiedSet.addAll(this.getProcessingItems());
+               boolean result = 
copiedSet.removeAll(this.getEntranceProcessingItems());
+               if (!result) {
+                       throw new Exception("Failed extracting the set of 
non-entrance processing items");
+               }
+               return copiedSet;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
new file mode 100644
index 0000000..56427d0
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaConfigFactory.java
@@ -0,0 +1,532 @@
+package com.yahoo.labs.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.local.LocalJobFactory;
+import org.apache.samza.job.yarn.YarnJobFactory;
+import org.apache.samza.system.kafka.KafkaSystemFactory;
+
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.impl.SamoaSystemFactory;
+import com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.impl.SamzaProcessingItem;
+import com.yahoo.labs.samoa.topology.impl.SamzaStream;
+import com.yahoo.labs.samoa.topology.impl.SamzaTopology;
+import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream;
+
+/**
+ * Generate Configs that will be used to submit Samza jobs
+ * from the input topology (one config per PI/EntrancePI in 
+ * the topology)
+ * 
+ * @author Anh Thu Vu
+ *
+ */
+public class SamzaConfigFactory {
+       public static final String SYSTEM_NAME = "samoa";
+       
+       // DEFAULT VALUES
+       private static final String DEFAULT_ZOOKEEPER = "localhost:2181";
+       private static final String DEFAULT_BROKER_LIST = "localhost:9092";
+
+       // DELIMINATORS
+       public static final String COMMA = ",";
+       public static final String COLON = ":";
+       public static final String DOT = ".";
+       public static final char DOLLAR_SIGN = '$';
+       public static final char QUESTION_MARK = '?';
+       
+       // PARTITIONING SCHEMES
+       public static final String SHUFFLE = "shuffle";
+       public static final String KEY = "key";
+       public static final String BROADCAST = "broadcast";
+
+       // PROPERTY KEYS
+       // JOB
+       public static final String JOB_FACTORY_CLASS_KEY = "job.factory.class";
+       public static final String JOB_NAME_KEY = "job.name";
+       // YARN 
+       public static final String YARN_PACKAGE_KEY = "yarn.package.path";
+       public static final String CONTAINER_MEMORY_KEY = 
"yarn.container.memory.mb";
+       public static final String AM_MEMORY_KEY = 
"yarn.am.container.memory.mb";
+       public static final String CONTAINER_COUNT_KEY = "yarn.container.count";
+       // TASK (SAMZA original)
+       public static final String TASK_CLASS_KEY = "task.class";
+       public static final String TASK_INPUTS_KEY = "task.inputs";
+       // TASK (extra)
+       public static final String FILE_KEY = "task.processor.file";
+       public static final String FILESYSTEM_KEY = "task.processor.filesystem";
+       public static final String ENTRANCE_INPUT_KEY = "task.entrance.input";
+       public static final String ENTRANCE_OUTPUT_KEY = 
"task.entrance.outputs";
+       public static final String YARN_CONF_HOME_KEY = "yarn.config.home";
+       // KAFKA
+       public static final String ZOOKEEPER_URI_KEY = 
"consumer.zookeeper.connect";
+       public static final String BROKER_URI_KEY = 
"producer.metadata.broker.list";
+       public static final String KAFKA_BATCHSIZE_KEY = 
"producer.batch.num.messages";
+       public static final String KAFKA_PRODUCER_TYPE_KEY = 
"producer.producer.type";
+       // SERDE
+       public static final String SERDE_REGISTRATION_KEY = "kryo.register";
+
+       // Instance variables
+       private boolean isLocalMode;
+       private String zookeeper;
+       private String kafkaBrokerList;
+       private int replicationFactor;
+       private int amMemory;
+       private int containerMemory;
+       private int piPerContainerRatio;
+       private int checkpointFrequency; // in ms
+
+       private String jarPath;
+       private String kryoRegisterFile = null;
+
+       public SamzaConfigFactory() {
+               this.isLocalMode = false;
+               this.zookeeper = DEFAULT_ZOOKEEPER;
+               this.kafkaBrokerList = DEFAULT_BROKER_LIST;
+               this.checkpointFrequency = 60000; // default: 1 minute
+               this.replicationFactor = 1;
+       }
+
+       /*
+        * Setter methods
+        */
+       public SamzaConfigFactory setYarnPackage(String packagePath) {
+               this.jarPath = packagePath;
+               return this;
+       }
+
+       public SamzaConfigFactory setLocalMode(boolean isLocal) {
+               this.isLocalMode = isLocal;
+               return this;
+       }
+
+       public SamzaConfigFactory setZookeeper(String zk) {
+               this.zookeeper = zk;
+               return this;
+       }
+
+       public SamzaConfigFactory setKafka(String brokerList) {
+               this.kafkaBrokerList = brokerList;
+               return this;
+       }
+       
+       public SamzaConfigFactory setCheckpointFrequency(int freq) {
+               this.checkpointFrequency = freq;
+               return this;
+       }
+       
+       public SamzaConfigFactory setReplicationFactor(int replicationFactor) {
+               this.replicationFactor = replicationFactor;
+               return this;
+       }
+
+       public SamzaConfigFactory setAMMemory(int mem) {
+               this.amMemory = mem;
+               return this;
+       }
+
+       public SamzaConfigFactory setContainerMemory(int mem) {
+               this.containerMemory = mem;
+               return this;
+       }
+       
+       public SamzaConfigFactory setPiPerContainerRatio(int piPerContainer) {
+               this.piPerContainerRatio = piPerContainer;
+               return this;
+       }
+
+       public SamzaConfigFactory setKryoRegisterFile(String kryoRegister) {
+               this.kryoRegisterFile = kryoRegister;
+               return this;
+       }
+
+       /*
+        * Generate a map of all config properties for the input 
SamzaProcessingItem
+        */
+       private Map<String,String> getMapForPI(SamzaProcessingItem pi, String 
filename, String filesystem) throws Exception {
+               Map<String,String> map = getBasicSystemConfig();
+
+               // Set job name, task class, task inputs (from 
SamzaProcessingItem)
+               setJobName(map, pi.getName());
+               setTaskClass(map, SamzaProcessingItem.class.getName());
+
+               StringBuilder streamNames = new StringBuilder();
+               boolean first = true;
+               for(SamzaSystemStream stream:pi.getInputStreams()) {
+                       if (!first) streamNames.append(COMMA);
+                       
streamNames.append(stream.getSystem()+DOT+stream.getStream());
+                       if (first) first = false;
+               }
+               setTaskInputs(map, streamNames.toString());
+
+               // Processor file
+               setFileName(map, filename);
+               setFileSystem(map, filesystem);
+               
+               List<String> nameList = new ArrayList<String>();
+               // Default kafka system: kafka0: sync producer
+               // This system is always required: it is used for checkpointing
+               nameList.add("kafka0");
+               setKafkaSystem(map, "kafka0", this.zookeeper, 
this.kafkaBrokerList, 1);
+               // Output streams: set kafka systems
+               for (SamzaStream stream:pi.getOutputStreams()) {
+                       boolean found = false;
+                       for (String name:nameList) {
+                               if (stream.getSystemName().equals(name)) {
+                                       found = true;
+                                       break;
+                               }
+                       }
+                       if (!found) {
+                               nameList.add(stream.getSystemName());
+                               setKafkaSystem(map, stream.getSystemName(), 
this.zookeeper, this.kafkaBrokerList, stream.getBatchSize());
+                       }
+               }
+               // Input streams: set kafka systems
+               for (SamzaSystemStream stream:pi.getInputStreams()) {
+                       boolean found = false;
+                       for (String name:nameList) {
+                               if (stream.getSystem().equals(name)) {
+                                       found = true;
+                                       break;
+                               }
+                       }
+                       if (!found) {
+                               nameList.add(stream.getSystem());
+                               setKafkaSystem(map, stream.getSystem(), 
this.zookeeper, this.kafkaBrokerList, 1);
+                       }
+               }
+               
+               // Checkpointing
+               
setValue(map,"task.checkpoint.factory","org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+               setValue(map,"task.checkpoint.system","kafka0");
+               setValue(map,"task.commit.ms","1000");
+               
setValue(map,"task.checkpoint.replication.factor",Integer.toString(this.replicationFactor));
+               
+               // Number of containers
+               setNumberOfContainers(map, pi.getParallelism(), 
this.piPerContainerRatio);
+
+               return map;
+       }
+
+       /*
+        * Generate a map of all config properties for the input 
SamzaProcessingItem
+        */
+       public Map<String,String> 
getMapForEntrancePI(SamzaEntranceProcessingItem epi, String filename, String 
filesystem) {
+               Map<String,String> map = getBasicSystemConfig();
+
+               // Set job name, task class (from SamzaEntranceProcessingItem)
+               setJobName(map, epi.getName());
+               setTaskClass(map, SamzaEntranceProcessingItem.class.getName());
+
+               // Input for the entrance task (from our custom consumer)
+               setTaskInputs(map, SYSTEM_NAME+"."+epi.getName());
+
+               // Output from entrance task
+               // Since entrancePI should have only 1 output stream
+               // there is no need for checking the batch size, setting 
different system names
+               // The custom consumer (samoa system) does not suuport reading 
from a specific index
+               // => no need for checkpointing
+               SamzaStream outputStream = (SamzaStream)epi.getOutputStream();
+               // Set samoa system factory
+               setValue(map, "systems."+SYSTEM_NAME+".samza.factory", 
SamoaSystemFactory.class.getName());
+               // Set Kafka system (only if there is an output stream)
+               if (outputStream != null)
+                       setKafkaSystem(map, outputStream.getSystemName(), 
this.zookeeper, this.kafkaBrokerList, outputStream.getBatchSize());
+
+               // Processor file
+               setFileName(map, filename);
+               setFileSystem(map, filesystem);
+               
+               // Number of containers
+               setNumberOfContainers(map, 1, this.piPerContainerRatio);
+
+               return map;
+       }
+
+       /*
+        * Generate a list of map (of config properties) for all PIs and EPI in 
+        * the input topology
+        */
+       public List<Map<String,String>> getMapsForTopology(SamzaTopology 
topology) throws Exception {
+
+               List<Map<String,String>> maps = new 
ArrayList<Map<String,String>>();
+
+               // File to write serialized objects
+               String filename = topology.getTopologyName() + ".dat";
+               Path dirPath = FileSystems.getDefault().getPath("dat");
+               Path filePath= 
FileSystems.getDefault().getPath(dirPath.toString(), filename);
+               String dstPath = filePath.toString();
+               String resPath;
+               String filesystem;
+               if (this.isLocalMode) {
+                       filesystem = SystemsUtils.LOCAL_FS;
+                       File dir = dirPath.toFile();
+                       if (!dir.exists()) 
+                               FileUtils.forceMkdir(dir);
+               }
+               else {
+                       filesystem = SystemsUtils.HDFS;
+               }
+
+               // Correct system name for streams
+               this.setSystemNameForStreams(topology.getStreams());
+               
+               // Add all PIs to a collection (map)
+               Map<String,Object> piMap = new HashMap<String,Object>();
+               Set<EntranceProcessingItem> entranceProcessingItems = 
topology.getEntranceProcessingItems();
+               Set<IProcessingItem> processingItems = 
topology.getNonEntranceProcessingItems();
+               for(EntranceProcessingItem epi:entranceProcessingItems) {
+                       SamzaEntranceProcessingItem sepi = 
(SamzaEntranceProcessingItem) epi;
+                       piMap.put(sepi.getName(), sepi);
+               }
+               for(IProcessingItem pi:processingItems) {
+                       SamzaProcessingItem spi = (SamzaProcessingItem) pi;
+                       piMap.put(spi.getName(), spi);
+               }
+
+               // Serialize all PIs
+               boolean serialized = false;
+               if (this.isLocalMode) {
+                       serialized = 
SystemsUtils.serializeObjectToLocalFileSystem(piMap, dstPath);
+                       resPath = dstPath;
+               }
+               else {
+                       resPath = SystemsUtils.serializeObjectToHDFS(piMap, 
dstPath);
+                       serialized = resPath != null;
+               }
+
+               if (!serialized) {
+                       throw new Exception("Fail serialize map of PIs to 
file");
+               }
+
+               // MapConfig for all PIs
+               for(EntranceProcessingItem epi:entranceProcessingItems) {
+                       SamzaEntranceProcessingItem sepi = 
(SamzaEntranceProcessingItem) epi;
+                       maps.add(this.getMapForEntrancePI(sepi, resPath, 
filesystem));
+               }
+               for(IProcessingItem pi:processingItems) {
+                       SamzaProcessingItem spi = (SamzaProcessingItem) pi;
+                       maps.add(this.getMapForPI(spi, resPath, filesystem));
+               }
+
+               return maps;
+       }
+
+       /**
+        * Construct a list of MapConfigs for a Topology
+        * @return the list of MapConfigs
+        * @throws Exception 
+        */
+       public List<MapConfig> getMapConfigsForTopology(SamzaTopology topology) 
throws Exception {
+               List<MapConfig> configs = new ArrayList<MapConfig>();
+               List<Map<String,String>> maps = 
this.getMapsForTopology(topology);
+               for(Map<String,String> map:maps) {
+                       configs.add(new MapConfig(map));
+               }
+               return configs;
+       }
+       
+       /*
+        *
+        */
+       public void setSystemNameForStreams(Set<Stream> streams) {
+               Map<Integer, String> batchSizeMap = new HashMap<Integer, 
String>();
+               batchSizeMap.put(1, "kafka0"); // default system with sync 
producer
+               int counter = 0;
+               for (Stream stream:streams) {
+                       SamzaStream samzaStream = (SamzaStream) stream;
+                       String systemName = 
batchSizeMap.get(samzaStream.getBatchSize());
+                       if (systemName == null) {
+                               counter++;
+                               // Add new system
+                               systemName = "kafka"+Integer.toString(counter);
+                               batchSizeMap.put(samzaStream.getBatchSize(), 
systemName);
+                       }
+                       samzaStream.setSystemName(systemName);
+               }
+
+}
+
+       /*
+        * Generate a map with common properties for PIs and EPI
+        */
+       private Map<String,String> getBasicSystemConfig() {
+               Map<String,String> map = new HashMap<String,String>();
+               // Job & Task
+               if (this.isLocalMode) 
+                       map.put(JOB_FACTORY_CLASS_KEY, 
LocalJobFactory.class.getName());
+               else {
+                       map.put(JOB_FACTORY_CLASS_KEY, 
YarnJobFactory.class.getName());
+
+                       // yarn
+                       map.put(YARN_PACKAGE_KEY,jarPath);
+                       map.put(CONTAINER_MEMORY_KEY, 
Integer.toString(this.containerMemory));
+                       map.put(AM_MEMORY_KEY, Integer.toString(this.amMemory));
+                       map.put(CONTAINER_COUNT_KEY, "1"); 
+                       map.put(YARN_CONF_HOME_KEY, 
SystemsUtils.getHadoopConfigHome());
+                       
+                       // Task opts (Heap size = 0.75 container memory) 
+                       int heapSize = (int)(0.75*this.containerMemory);
+                       map.put("task.opts", 
"-Xmx"+Integer.toString(heapSize)+"M -XX:+PrintGCDateStamps");
+               }
+
+
+               map.put(JOB_NAME_KEY, "");
+               map.put(TASK_CLASS_KEY, "");
+               map.put(TASK_INPUTS_KEY, "");
+
+               // register serializer
+               
map.put("serializers.registry.kryo.class",SamzaKryoSerdeFactory.class.getName());
+
+               // Serde registration
+               setKryoRegistration(map, this.kryoRegisterFile);
+
+               return map;
+       }
+       
+       /*
+        * Helper methods to set different properties in the input map
+        */
+       private static void setJobName(Map<String,String> map, String jobName) {
+               map.put(JOB_NAME_KEY, jobName);
+       }
+
+       private static void setFileName(Map<String,String> map, String 
filename) {
+               map.put(FILE_KEY, filename);
+       }
+
+       private static void setFileSystem(Map<String,String> map, String 
filesystem) {
+               map.put(FILESYSTEM_KEY, filesystem);
+       }
+
+       private static void setTaskClass(Map<String,String> map, String 
taskClass) {
+               map.put(TASK_CLASS_KEY, taskClass);
+       }
+
+       private static void setTaskInputs(Map<String,String> map, String 
inputs) {
+               map.put(TASK_INPUTS_KEY, inputs);
+       }
+
+       private static void setKryoRegistration(Map<String, String> map, String 
kryoRegisterFile) {
+               if (kryoRegisterFile != null) {
+                       String value = readKryoRegistration(kryoRegisterFile);
+                       map.put(SERDE_REGISTRATION_KEY,  value);
+               }
+       }
+       
+       private static void setNumberOfContainers(Map<String, String> map, int 
parallelism, int piPerContainer) {
+               int res = parallelism / piPerContainer;
+               if (parallelism % piPerContainer != 0) res++;
+               map.put(CONTAINER_COUNT_KEY, Integer.toString(res));
+       }
+       
+       private static void setKafkaSystem(Map<String,String> map, String 
systemName, String zk, String brokers, int batchSize) {
+               
map.put("systems."+systemName+".samza.factory",KafkaSystemFactory.class.getName());
+               map.put("systems."+systemName+".samza.msg.serde","kryo");
+
+               map.put("systems."+systemName+"."+ZOOKEEPER_URI_KEY,zk);
+               map.put("systems."+systemName+"."+BROKER_URI_KEY,brokers);
+               
map.put("systems."+systemName+"."+KAFKA_BATCHSIZE_KEY,Integer.toString(batchSize));
+
+               map.put("systems."+systemName+".samza.offset.default","oldest");
+
+               if (batchSize > 1) {
+                       
map.put("systems."+systemName+"."+KAFKA_PRODUCER_TYPE_KEY,"async");
+               }
+               else {
+                       
map.put("systems."+systemName+"."+KAFKA_PRODUCER_TYPE_KEY,"sync");
+               }
+       }
+       
+       // Set custom properties
+       private static void setValue(Map<String,String> map, String key, String 
value) {
+               map.put(key,value);
+       }
+
+       /*
+        * Helper method to parse Kryo registration file
+        */
+       private static String readKryoRegistration(String filePath) {
+               FileInputStream fis = null;
+               Properties props = new Properties();
+               StringBuilder result = new StringBuilder();
+               try {
+                       fis = new FileInputStream(filePath);
+                       props.load(fis);
+
+                       boolean first = true;
+                       String value = null;
+                       for(String k:props.stringPropertyNames()) {
+                               if (!first)
+                                       result.append(COMMA);
+                               else
+                                       first = false;
+                               
+                               // Need to avoid the dollar sign as samza pass 
all the properties in
+                               // the config to containers via commandline 
parameters/enviroment variables
+                               // We might escape the dollar sign, but it's 
more complicated than
+                               // replacing it with something else
+                               result.append(k.trim().replace(DOLLAR_SIGN, 
QUESTION_MARK));
+                               value = props.getProperty(k);
+                               if (value != null && value.trim().length() > 0) 
{
+                                       result.append(COLON);
+                                       
result.append(value.trim().replace(DOLLAR_SIGN, QUESTION_MARK));
+                               }
+                       }
+               } catch (FileNotFoundException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               } catch (IOException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               } finally {
+                       if (fis != null)
+                               try {
+                                       fis.close();
+                               } catch (IOException e) {
+                                       // TODO Auto-generated catch block
+                                       e.printStackTrace();
+                               }
+               }
+
+               return result.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
new file mode 100644
index 0000000..8e9e446
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SamzaKryoSerdeFactory.java
@@ -0,0 +1,155 @@
+package com.yahoo.labs.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Implementation of Samza's SerdeFactory
+ * that uses Kryo to serialize/deserialize objects
+ * 
+ * @author Anh Thu Vu
+ * @param <T>
+ *
+ */
+public class SamzaKryoSerdeFactory<T> implements SerdeFactory<T> {
+       
+       private static final Logger logger = 
LoggerFactory.getLogger(SamzaKryoSerdeFactory.class);
+       
+       public static class SamzaKryoSerde<V> implements Serde<V> {
+               private Kryo kryo;
+               
+               public SamzaKryoSerde (String registrationInfo) {
+                       this.kryo = new Kryo();
+                       this.register(registrationInfo);
+               }
+               
+               @SuppressWarnings({ "rawtypes", "unchecked" })
+               private void register(String registrationInfo) {
+                       if (registrationInfo == null) return;
+                       
+                       String[] infoList = 
registrationInfo.split(SamzaConfigFactory.COMMA);
+                       
+                       Class targetClass = null;
+                       Class serializerClass = null;
+                       Serializer serializer = null;
+                       
+                       for (String info:infoList) {
+                               String[] fields = 
info.split(SamzaConfigFactory.COLON);
+                               
+                               targetClass = null;
+                               serializerClass = null;
+                               if (fields.length > 0) {
+                                       try {
+                                               targetClass = 
Class.forName(fields[0].replace(SamzaConfigFactory.QUESTION_MARK, 
SamzaConfigFactory.DOLLAR_SIGN));
+                                       } catch (ClassNotFoundException e) {
+                                               // TODO Auto-generated catch 
block
+                                               e.printStackTrace();
+                                       }
+                               }
+                               if (fields.length > 1) {
+                                       try {
+                                               serializerClass = 
Class.forName(fields[1].replace(SamzaConfigFactory.QUESTION_MARK, 
SamzaConfigFactory.DOLLAR_SIGN));
+                                       } catch (ClassNotFoundException e) {
+                                               // TODO Auto-generated catch 
block
+                                               e.printStackTrace();
+                                       }
+                               }
+                               
+                               if (targetClass != null) {
+                                       if (serializerClass == null) {
+                                               kryo.register(targetClass);
+                                       }
+                                       else {
+                                               serializer = 
resolveSerializerInstance(kryo, targetClass, (Class<? extends 
Serializer>)serializerClass) ;
+                                               kryo.register(targetClass, 
serializer);
+                                       }
+                               }
+                               else {
+                                       logger.info("Invalid registration 
info:{}",info);
+                               }
+                       }
+               }
+               
+               @SuppressWarnings("rawtypes")
+               private static Serializer resolveSerializerInstance(Kryo k, 
Class superClass, Class<? extends Serializer> serializerClass) {
+               try {
+                   try {
+                       return serializerClass.getConstructor(Kryo.class, 
Class.class).newInstance(k, superClass);
+                   } catch (Exception ex1) {
+                       try {
+                           return 
serializerClass.getConstructor(Kryo.class).newInstance(k);
+                       } catch (Exception ex2) {
+                           try {
+                               return 
serializerClass.getConstructor(Class.class).newInstance(superClass);
+                           } catch (Exception ex3) {
+                               return serializerClass.newInstance();
+                           }
+                       }
+                   }
+               } catch (Exception ex) {
+                   throw new IllegalArgumentException("Unable to create 
serializer \""
+                                                      + 
serializerClass.getName()
+                                                      + "\" for class: "
+                                                      + superClass.getName(), 
ex);
+               }
+           }
+               
+               /*
+                * Implement Samza Serde interface
+                */
+               @Override
+               public byte[] toBytes(V obj) {
+                       ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                       Output output = new Output(bos);
+                       kryo.writeClassAndObject(output, obj);
+                       output.flush();
+                       output.close();
+                       return bos.toByteArray();
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public V fromBytes(byte[] byteArr) {
+                       Input input = new Input(byteArr);
+                       Object obj = kryo.readClassAndObject(input);
+                       input.close();
+                       return (V)obj;
+               }
+               
+       }
+
+       @Override
+       public Serde<T> getSerde(String name, Config config) {
+               return new 
SamzaKryoSerde<T>(config.get(SamzaConfigFactory.SERDE_REGISTRATION_KEY));
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
new file mode 100644
index 0000000..4bdbafd
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SerializableSerializer.java
@@ -0,0 +1,70 @@
+package com.yahoo.labs.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Serialize and deserialize objects with Java serialization
+ * 
+ * @author Anh Thu Vu
+ */
+public class SerializableSerializer extends Serializer<Object> {
+       @Override
+    public void write(Kryo kryo, Output output, Object object) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try {
+            ObjectOutputStream oos = new ObjectOutputStream(bos);
+            oos.writeObject(object);
+            oos.flush();
+        } catch(IOException e) {
+            throw new RuntimeException(e);
+        }
+        byte[] ser = bos.toByteArray();
+        output.writeInt(ser.length);
+        output.writeBytes(ser);
+    }
+    
+    @SuppressWarnings("rawtypes")
+       @Override
+    public Object read(Kryo kryo, Input input, Class c) {
+        int len = input.readInt();
+        byte[] ser = new byte[len];
+        input.readBytes(ser);
+        ByteArrayInputStream bis = new ByteArrayInputStream(ser);
+        try {
+            ObjectInputStream ois = new ObjectInputStream(bis);
+            return ois.readObject();
+        } catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
new file mode 100644
index 0000000..367f9f9
--- /dev/null
+++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/utils/SystemsUtils.java
@@ -0,0 +1,383 @@
+package com.yahoo.labs.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.file.FileSystems;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities methods for:
+ * - Kafka
+ * - HDFS
+ * - Handling files on local FS
+ * 
+ * @author Anh Thu Vu
+ */
+public class SystemsUtils {
+       private static final Logger logger = 
LoggerFactory.getLogger(SystemsUtils.class);
+       
+       public static final String HDFS = "hdfs";
+       public static final String LOCAL_FS = "local";
+       
+       private static final String TEMP_FILE = "samoaTemp";
+       private static final String TEMP_FILE_SUFFIX = ".dat";
+       
+       /*
+        * Kafka
+        */
+       private static class KafkaUtils {
+               private static ZkClient zkClient;
+               
+               static void setZookeeper(String zk) {
+                       zkClient = new ZkClient(zk, 30000, 30000, new 
ZKStringSerializerWrapper());
+               }
+
+               /*
+                * Create Kafka topic/stream
+                */
+               static void createKafkaTopic(String name, int partitions, int 
replicas) {
+                       AdminUtils.createTopic(zkClient, name, partitions, 
replicas, new Properties());
+               }
+               
+               static class ZKStringSerializerWrapper implements ZkSerializer {
+                       @Override
+                       public Object deserialize(byte[] byteArray) throws 
ZkMarshallingError {
+                               return 
ZKStringSerializer.deserialize(byteArray);
+                       }
+
+                       @Override
+                       public byte[] serialize(Object obj) throws 
ZkMarshallingError {
+                               return ZKStringSerializer.serialize(obj);
+                       }
+               }
+       }
+       
+       /*
+        * HDFS
+        */
+       private static class HDFSUtils {
+               private static String coreConfPath;
+               private static String hdfsConfPath;
+               private static String configHomePath;
+               private static String samoaDir = null;
+               
+               static void setHadoopConfigHome(String hadoopConfPath) {
+                       logger.info("Hadoop config home:{}",hadoopConfPath);
+                       configHomePath = hadoopConfPath;
+                       java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "core-site.xml");
+                       java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopConfPath, "hdfs-site.xml");
+                       coreConfPath = coreSitePath.toAbsolutePath().toString();
+                       hdfsConfPath = hdfsSitePath.toAbsolutePath().toString();
+               }
+               
+               static String getNameNodeUri() {
+                       Configuration config = new Configuration();
+                       config.addResource(new Path(coreConfPath));
+                       config.addResource(new Path(hdfsConfPath));
+                       
+                       return config.get("fs.defaultFS");
+               }
+               
+               static String getHadoopConfigHome() {
+                       return configHomePath;
+               }
+               
+               static void setSAMOADir(String dir) {
+                       if (dir != null)
+                               samoaDir = getNameNodeUri()+dir;
+                       else 
+                               samoaDir = null;
+               }
+               
+               static String getDefaultSAMOADir() throws IOException {
+                       Configuration config = new Configuration();
+                       config.addResource(new Path(coreConfPath));
+                       config.addResource(new Path(hdfsConfPath));
+                       
+                       FileSystem fs = FileSystem.get(config);
+                       Path defaultDir = new 
Path(fs.getHomeDirectory(),".samoa");
+                       return defaultDir.toString();
+               }
+               
+               static boolean deleteFileIfExist(String absPath) {
+                       Path p = new Path(absPath);
+                       return deleteFileIfExist(p);
+               }
+               
+               static boolean deleteFileIfExist(Path p) {
+                       Configuration config = new Configuration();
+                       config.addResource(new Path(coreConfPath));
+                       config.addResource(new Path(hdfsConfPath));
+                       
+                       FileSystem fs;
+                       try {
+                               fs = FileSystem.get(config);
+                               if (fs.exists(p)) {
+                                       return fs.delete(p, false);
+                               }
+                               else 
+                                       return true;
+                       } catch (IOException e) {
+                               // TODO Auto-generated catch block
+                               e.printStackTrace();
+                       }
+                       return false;
+               }
+               
+               /*
+                * Write to HDFS
+                */
+               static String writeToHDFS(File file, String dstPath) {
+                       Configuration config = new Configuration();
+                       config.addResource(new Path(coreConfPath));
+                       config.addResource(new Path(hdfsConfPath));
+                       logger.info("Filesystem 
name:{}",config.get("fs.defaultFS"));
+                       
+                       // Default samoaDir
+                       if (samoaDir == null) {
+                               try {
+                                       samoaDir = getDefaultSAMOADir();
+                               }
+                               catch (IOException e) {
+                                       e.printStackTrace();
+                                       return null;
+                               }
+                       }
+                       
+                       // Setup src and dst paths
+                       //java.nio.file.Path tempPath = 
FileSystems.getDefault().getPath(samoaDir, dstPath);
+                       Path dst = new Path(samoaDir,dstPath);
+                       Path src = new Path(file.getAbsolutePath());
+                       
+                       // Delete file if already exists in HDFS
+                       if (deleteFileIfExist(dst) == false)
+                               return null;
+                       
+                       // Copy to HDFS
+                       FileSystem fs;
+                       try {
+                               fs = FileSystem.get(config);
+                               fs.copyFromLocalFile(src, dst);
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                               return null;
+                       }
+                       
+                       return dst.toString(); // abs path to file
+               }
+               
+               /*
+                * Read from HDFS
+                */
+               static Object deserializeObjectFromFile(String filePath) {
+                       logger.info("Deserialize HDFS file:{}",filePath);
+                       Configuration config = new Configuration();
+                       config.addResource(new Path(coreConfPath));
+                       config.addResource(new Path(hdfsConfPath));
+                       
+                       Path file = new Path(filePath);
+                       FSDataInputStream dataInputStream = null;
+                       ObjectInputStream ois = null;
+                       Object obj = null;
+                       FileSystem fs;
+                       try {
+                               fs = FileSystem.get(config);
+                               dataInputStream = fs.open(file);
+                               ois = new ObjectInputStream(dataInputStream);
+                               obj = ois.readObject();
+                       } catch (IOException e) {
+                               // TODO Auto-generated catch block
+                               e.printStackTrace();
+                       } catch (ClassNotFoundException e) {
+                               try {
+                                       if (dataInputStream != null) 
dataInputStream.close();
+                                       if (ois != null) ois.close();
+                               } catch (IOException ioException) {
+                                       // TODO auto-generated catch block
+                                       e.printStackTrace();
+                               }
+                       }
+                       
+                       return obj;
+               }
+               
+       }
+       
+       private static class LocalFileSystemUtils {
+               static boolean serializObjectToFile(Object obj, String fn) {
+                       FileOutputStream fos = null;
+                       ObjectOutputStream oos = null;
+                       try {
+                               fos = new FileOutputStream(fn);
+                               oos = new ObjectOutputStream(fos);
+                               oos.writeObject(obj);
+                       } catch (FileNotFoundException e) {
+                               e.printStackTrace();
+                               return false;
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                               return false;
+                       } finally {
+                               try {
+                                       if (fos != null) fos.close();
+                                       if (oos != null) oos.close();
+                               } catch (IOException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+
+                       return true;
+               }
+       
+               static Object deserializeObjectFromLocalFile(String filename) {
+                       logger.info("Deserialize local file:{}",filename);
+                       FileInputStream fis = null;
+                       ObjectInputStream ois = null;
+                       Object obj = null;
+                       try {
+                               fis = new FileInputStream(filename);
+                               ois = new ObjectInputStream(fis);
+                               obj = ois.readObject();
+                       } catch (IOException e) {
+                               // TODO auto-generated catch block
+                               e.printStackTrace();
+                       } catch (ClassNotFoundException e) {
+                               // TODO Auto-generated catch block
+                               e.printStackTrace();
+                       } finally {
+                               try {
+                                       if (fis != null) fis.close();
+                                       if (ois != null) ois.close();
+                               } catch (IOException e) {
+                                       // TODO auto-generated catch block
+                                       e.printStackTrace();
+                               }
+                       }
+
+                       return obj;
+               }
+       }
+       
+       
+       
+       /*
+        * Create streams
+        */
+       public static void createKafkaTopic(String name, int partitions) {
+               createKafkaTopic(name, partitions, 1);
+       }
+       
+       public static void createKafkaTopic(String name, int partitions, int 
replicas) {
+               KafkaUtils.createKafkaTopic(name, partitions, replicas);
+       }
+       
+       /*
+        * Serialize object
+        */
+       public static boolean serializeObjectToLocalFileSystem(Object object, 
String path) {
+               return LocalFileSystemUtils.serializObjectToFile(object, path);
+       }
+       
+       public static String serializeObjectToHDFS(Object object, String path) {
+               File tmpDatFile;
+               try {
+                       tmpDatFile = File.createTempFile(TEMP_FILE, 
TEMP_FILE_SUFFIX);
+                       if (serializeObjectToLocalFileSystem(object, 
tmpDatFile.getAbsolutePath())) {
+                               return HDFSUtils.writeToHDFS(tmpDatFile, path);
+                       }
+               } catch (IOException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+               return null;
+       }
+       
+       /*
+        * Deserialize object
+        */
+       @SuppressWarnings("unchecked")
+       public static Map<String,Object> deserializeMapFromFile(String 
filesystem, String filename) {
+               Map<String,Object> map;
+               if (filesystem.equals(HDFS)) {
+                       map = (Map<String,Object>) 
HDFSUtils.deserializeObjectFromFile(filename);
+               }
+               else {
+                       map = (Map<String,Object>) 
LocalFileSystemUtils.deserializeObjectFromLocalFile(filename);
+               }
+               return map;
+       }
+       
+       public static Object deserializeObjectFromFileAndKey(String filesystem, 
String filename, String key) {
+               Map<String,Object> map = deserializeMapFromFile(filesystem, 
filename);
+               if (map == null) return null;
+               return map.get(key);
+       }
+       
+       /*
+        * Setup
+        */
+       public static void setZookeeper(String zookeeper) {
+               KafkaUtils.setZookeeper(zookeeper);
+       }
+       
+       public static void setHadoopConfigHome(String hadoopHome) {
+               HDFSUtils.setHadoopConfigHome(hadoopHome);
+       }
+       
+       public static void setSAMOADir(String samoaDir) {
+               HDFSUtils.setSAMOADir(samoaDir);
+       }
+       
+       /*
+        * Others
+        */
+       public static String getHDFSNameNodeUri() {
+               return HDFSUtils.getNameNodeUri();
+       }
+       public static String getHadoopConfigHome() {
+               return HDFSUtils.getHadoopConfigHome();
+       }
+       
+       public static String copyToHDFS(File file, String dstPath) {
+               return HDFSUtils.writeToHDFS(file, dstPath);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/resources/log4j.xml 
b/samoa-samza/src/main/resources/log4j.xml
new file mode 100644
index 0000000..86451cf
--- /dev/null
+++ b/samoa-samza/src/main/resources/log4j.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 - 2014 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+  
+  <appender name="RollingAppender" 
class="org.apache.log4j.DailyRollingFileAppender">
+     <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+     <param name="DatePattern" value="'.'yyyy-MM-dd" />
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} 
[%p] %m%n" />
+     </layout>
+  </appender>
+  
+  <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d [%t] %-5p %c (%F:%L) - %m%n"/>
+     </layout>
+  </appender>
+
+  <category name="com.yahoo.labs" additivity="false">
+     <priority value="info"/>
+     <appender-ref ref="CONSOLE"/>
+  </category>
+  
+  <root>
+    <priority value="info" />
+    <appender-ref ref="RollingAppender"/>
+  </root>
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-storm/pom.xml b/samoa-storm/pom.xml
new file mode 100644
index 0000000..7929593
--- /dev/null
+++ b/samoa-storm/pom.xml
@@ -0,0 +1,123 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <name>samoa-storm</name>
+    <description>Storm bindings for SAMOA</description>
+
+    <artifactId>samoa-storm</artifactId>
+    <parent>
+        <groupId>com.yahoo.labs.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+
+    <repositories>
+        <repository> <!-- repository for Storm -->
+            <id>clojars</id>
+            <url>http://clojars.org/repo/</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-test</artifactId>
+            <type>test-jar</type>
+            <classifier>test-jar-with-dependencies</classifier>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>storm</groupId>
+            <artifactId>storm</artifactId>
+            <version>${storm.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>${zookeeper.storm.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j-log4j12.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- SAMOA assembly -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven-assembly-plugin.version}</version>
+                <configuration>
+                    <finalName>SAMOA-Storm-${project.version}</finalName>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                    <outputDirectory>../target</outputDirectory>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifestEntries>
+                            
<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>
+                            
<Bundle-Description>${project.description}</Bundle-Description>
+                            
<Implementation-Version>${project.version}</Implementation-Version>
+                            <Implementation-Vendor>Yahoo 
Labs</Implementation-Vendor>
+                            
<Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for 
inheritance merges -->
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${maven-surefire-plugin.version}</version>
+                <configuration>
+                    <argLine>-Xmx1G</argLine>
+                    <redirectTestOutputToFile>false</redirectTestOutputToFile>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
new file mode 100644
index 0000000..54792ae
--- /dev/null
+++ b/samoa-storm/src/main/java/com/yahoo/labs/samoa/LocalStormDoTask.java
@@ -0,0 +1,78 @@
+package com.yahoo.labs.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
+import com.yahoo.labs.samoa.topology.impl.StormSamoaUtils;
+import com.yahoo.labs.samoa.topology.impl.StormTopology;
+
+/**
+ * The main class to execute a SAMOA task in LOCAL mode in Storm.
+ *
+ * @author Arinto Murdopo
+ *
+ */
+public class LocalStormDoTask {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(LocalStormDoTask.class);
+
+    /**
+     * The main method.
+     *
+     * @param args the arguments
+     */
+    public static void main(String[] args) {
+
+        List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+        int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
+
+        args = tmpArgs.toArray(new String[0]);
+
+        //convert the arguments into Storm topology
+        StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+        String topologyName = stormTopo.getTopologyName();
+
+        Config conf = new Config();
+        //conf.putAll(Utils.readStormConfig());
+        conf.setDebug(false);
+
+        //local mode
+        conf.setMaxTaskParallelism(numWorker);
+
+        backtype.storm.LocalCluster cluster = new 
backtype.storm.LocalCluster();
+        cluster.submitTopology(topologyName, conf, 
stormTopo.getStormBuilder().createTopology());
+
+        backtype.storm.utils.Utils.sleep(600 * 1000);
+
+        cluster.killTopology(topologyName);
+        cluster.shutdown();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
new file mode 100644
index 0000000..ad9794d
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormBoltStream.java
@@ -0,0 +1,65 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Values;
+import com.yahoo.labs.samoa.core.ContentEvent;
+
+/**
+ * Storm Stream that connects into Bolt. It wraps Storm's outputCollector class
+ * @author Arinto Murdopo
+ *
+ */
+class StormBoltStream extends StormStream{
+       
+       /**
+        * 
+        */
+       private static final long serialVersionUID = -5712513402991550847L;
+       
+       private OutputCollector outputCollector;
+
+       StormBoltStream(String stormComponentId){
+               super(stormComponentId);
+       }
+
+       @Override
+       public void put(ContentEvent contentEvent) {
+               outputCollector.emit(this.outputStreamId, new 
Values(contentEvent, contentEvent.getKey()));
+       }
+       
+       public void setCollector(OutputCollector outputCollector){
+               this.outputCollector = outputCollector;
+       }
+
+//     @Override
+//     public void setStreamId(String streamId) {
+//             // TODO Auto-generated method stub
+//             //this.outputStreamId = streamId;
+//     }
+
+       @Override
+       public String getStreamId() {
+               // TODO Auto-generated method stub
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
new file mode 100644
index 0000000..347fd50
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormComponentFactory.java
@@ -0,0 +1,90 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+
+/**
+ * Component factory implementation for samoa-storm
+ */
+public final class StormComponentFactory implements ComponentFactory {
+
+    private final Map<String, Integer> processorList;
+
+    public StormComponentFactory() {
+        processorList = new HashMap<>();
+    }
+
+    @Override
+    public ProcessingItem createPi(Processor processor) {
+        return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), 1);
+    }
+
+    @Override
+    public EntranceProcessingItem createEntrancePi(EntranceProcessor 
processor) {
+        return new StormEntranceProcessingItem(processor, 
this.getComponentName(processor.getClass()));
+    }
+
+    @Override
+    public Stream createStream(IProcessingItem sourcePi) {
+        StormTopologyNode stormCompatiblePi = (StormTopologyNode) sourcePi;
+        return stormCompatiblePi.createStream();
+    }
+
+    @Override
+    public Topology createTopology(String topoName) {
+        return new StormTopology(topoName);
+    }
+
+    private String getComponentName(Class<? extends Processor> clazz) {
+        StringBuilder componentName = new 
StringBuilder(clazz.getCanonicalName());
+        String key = componentName.toString();
+        Integer index;
+
+        if (!processorList.containsKey(key)) {
+            index = 1;
+        } else {
+            index = processorList.get(key) + 1;
+        }
+
+        processorList.put(key, index);
+
+        componentName.append('_');
+        componentName.append(index);
+
+        return componentName.toString();
+    }
+
+    @Override
+    public ProcessingItem createPi(Processor processor, int parallelism) {
+        return new StormProcessingItem(processor, 
this.getComponentName(processor.getClass()), parallelism);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
new file mode 100644
index 0000000..fc0630a
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormDoTask.java
@@ -0,0 +1,117 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
+/**
+ * The main class that used by samoa script to execute SAMOA task. 
+ * 
+ * @author Arinto Murdopo
+ *
+ */
+public class StormDoTask {
+       private static final Logger logger = 
LoggerFactory.getLogger(StormDoTask.class);
+       private static String localFlag = "local";
+       private static String clusterFlag = "cluster";
+       
+       /**
+        * The main method.
+        * 
+        * @param args the arguments
+        */
+       public static void main(String[] args) {
+
+                List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
+               
+               boolean isLocal = isLocal(tmpArgs);
+               int numWorker = StormSamoaUtils.numWorkers(tmpArgs);
+               
+               args = tmpArgs.toArray(new String[0]);
+               
+               //convert the arguments into Storm topology
+               StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+               String topologyName = stormTopo.getTopologyName();
+               
+       Config conf = new Config();
+       conf.putAll(Utils.readStormConfig());
+       conf.setDebug(false);
+                       
+       
+               if(isLocal){
+                       //local mode
+                       conf.setMaxTaskParallelism(numWorker);
+                       
+                       backtype.storm.LocalCluster cluster = new 
backtype.storm.LocalCluster();
+                       cluster.submitTopology(topologyName , conf, 
stormTopo.getStormBuilder().createTopology());
+                       
+                       backtype.storm.utils.Utils.sleep(600*1000);
+                       
+                       cluster.killTopology(topologyName);
+                       cluster.shutdown();
+                       
+               }else{
+                       //cluster mode
+                       conf.setNumWorkers(numWorker);
+                       try {
+                               
backtype.storm.StormSubmitter.submitTopology(topologyName, conf,
+                                               
stormTopo.getStormBuilder().createTopology());
+                       } catch (backtype.storm.generated.AlreadyAliveException 
ale) {
+                               ale.printStackTrace();
+                       } catch 
(backtype.storm.generated.InvalidTopologyException ite) {
+                               ite.printStackTrace();
+                       }
+               }
+       }
+       
+       private static boolean isLocal(List<String> tmpArgs){
+               ExecutionMode executionMode = ExecutionMode.UNDETERMINED;
+               
+               int position = tmpArgs.size() - 1;
+               String flag = tmpArgs.get(position);
+               boolean isLocal = true;
+               
+               if(flag.equals(clusterFlag)){
+                       executionMode = ExecutionMode.CLUSTER;
+                       isLocal = false;
+               }else if(flag.equals(localFlag)){
+                       executionMode = ExecutionMode.LOCAL;
+                       isLocal = true;
+               }
+               
+               if(executionMode != ExecutionMode.UNDETERMINED){
+                       tmpArgs.remove(position);
+               }
+               
+               return isLocal;
+       }
+       
+       private enum ExecutionMode {LOCAL, CLUSTER, UNDETERMINED};
+}
+               

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
new file mode 100644
index 0000000..d4d80bf
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormEntranceProcessingItem.java
@@ -0,0 +1,208 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Map;
+import java.util.UUID;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+
+/**
+ * EntranceProcessingItem implementation for Storm.
+ */
+class StormEntranceProcessingItem extends AbstractEntranceProcessingItem 
implements StormTopologyNode {
+    private final StormEntranceSpout piSpout;
+
+    StormEntranceProcessingItem(EntranceProcessor processor) {
+        this(processor, UUID.randomUUID().toString());
+    }
+
+    StormEntranceProcessingItem(EntranceProcessor processor, String 
friendlyId) {
+       super(processor);
+       this.setName(friendlyId);
+        this.piSpout = new StormEntranceSpout(processor);
+    }
+
+    @Override
+    public EntranceProcessingItem setOutputStream(Stream stream) {
+        // piSpout.streams.add(stream);
+        piSpout.setOutputStream((StormStream) stream);
+        return this;
+    }
+    
+    @Override
+    public Stream getOutputStream() {
+       return piSpout.getOutputStream();
+    }
+    
+    @Override
+    public void addToTopology(StormTopology topology, int parallelismHint) {
+        topology.getStormBuilder().setSpout(this.getName(), piSpout, 
parallelismHint);
+    }
+
+    @Override
+    public StormStream createStream() {
+        return piSpout.createStream(this.getName());
+    }
+
+    @Override
+    public String getId() {
+        return this.getName();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder(super.toString());
+        sb.insert(0, String.format("id: %s, ", this.getName()));
+        return sb.toString();
+    }
+
+    /**
+     * Resulting Spout of StormEntranceProcessingItem
+     */
+    final static class StormEntranceSpout extends BaseRichSpout {
+
+        private static final long serialVersionUID = -9066409791668954099L;
+
+        // private final Set<StormSpoutStream> streams;
+        private final EntranceProcessor entranceProcessor;
+        private StormStream outputStream;
+
+        // private transient SpoutStarter spoutStarter;
+        // private transient Executor spoutExecutors;
+        // private transient LinkedBlockingQueue<StormTupleInfo> 
tupleInfoQueue;
+
+        private SpoutOutputCollector collector;
+
+        StormEntranceSpout(EntranceProcessor processor) {
+            // this.streams = new HashSet<StormSpoutStream>();
+            this.entranceProcessor = processor;
+        }
+
+        public StormStream getOutputStream() {
+            return outputStream;
+        }
+
+        public void setOutputStream(StormStream stream) {
+            this.outputStream = stream;
+        }
+
+        @Override
+        public void open(@SuppressWarnings("rawtypes") Map conf, 
TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+            // this.tupleInfoQueue = new LinkedBlockingQueue<StormTupleInfo>();
+
+            // Processor and this class share the same instance of stream
+            // for (StormSpoutStream stream : streams) {
+            // stream.setSpout(this);
+            // }
+            // outputStream.setSpout(this);
+
+            this.entranceProcessor.onCreate(context.getThisTaskId());
+            // this.spoutStarter = new SpoutStarter(this.starter);
+
+            // this.spoutExecutors = Executors.newSingleThreadExecutor();
+            // this.spoutExecutors.execute(spoutStarter);
+        }
+
+        @Override
+        public void nextTuple() {
+            if (entranceProcessor.hasNext()) {
+                Values value = newValues(entranceProcessor.nextEvent());
+                collector.emit(outputStream.getOutputId(), value);
+            } else
+                Utils.sleep(1000);
+            // StormTupleInfo tupleInfo = tupleInfoQueue.poll(50, 
TimeUnit.MILLISECONDS);
+            // if (tupleInfo != null) {
+            // Values value = newValues(tupleInfo.getContentEvent());
+            // collector.emit(tupleInfo.getStormStream().getOutputId(), value);
+            // }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            // for (StormStream stream : streams) {
+            // declarer.declareStream(stream.getOutputId(), new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, StormSamoaUtils.KEY_FIELD));
+            // }
+            declarer.declareStream(outputStream.getOutputId(), new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD, StormSamoaUtils.KEY_FIELD));
+        }
+
+        StormStream createStream(String piId) {
+            // StormSpoutStream stream = new StormSpoutStream(piId);
+            StormStream stream = new StormBoltStream(piId);
+            // streams.add(stream);
+            return stream;
+        }
+
+        // void put(StormSpoutStream stream, ContentEvent contentEvent) {
+        // tupleInfoQueue.add(new StormTupleInfo(stream, contentEvent));
+        // }
+
+        private Values newValues(ContentEvent contentEvent) {
+            return new Values(contentEvent, contentEvent.getKey());
+        }
+
+        // private final static class StormTupleInfo {
+        //
+        // private final StormStream stream;
+        // private final ContentEvent event;
+        //
+        // StormTupleInfo(StormStream stream, ContentEvent event) {
+        // this.stream = stream;
+        // this.event = event;
+        // }
+        //
+        // public StormStream getStormStream() {
+        // return this.stream;
+        // }
+        //
+        // public ContentEvent getContentEvent() {
+        // return this.event;
+        // }
+        // }
+
+        // private final static class SpoutStarter implements Runnable {
+        //
+        // private final TopologyStarter topoStarter;
+        //
+        // SpoutStarter(TopologyStarter topoStarter) {
+        // this.topoStarter = topoStarter;
+        // }
+        //
+        // @Override
+        // public void run() {
+        // this.topoStarter.start();
+        // }
+        // }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
new file mode 100644
index 0000000..5f86855
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormJarSubmitter.java
@@ -0,0 +1,75 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Properties;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.utils.Utils;
+
+/**
+ * Utility class to submit samoa-storm jar to a Storm cluster.
+ * 
+ * @author Arinto Murdopo
+ *
+ */
+public class StormJarSubmitter {
+       
+       public final static String UPLOADED_JAR_LOCATION_KEY = 
"UploadedJarLocation";
+
+       /**
+        * @param args
+        * @throws IOException 
+        */
+       public static void main(String[] args) throws IOException {
+               
+               Config config = new Config();
+               config.putAll(Utils.readCommandLineOpts());
+               config.putAll(Utils.readStormConfig());
+
+               String nimbusHost = (String) config.get(Config.NIMBUS_HOST);
+               int nimbusThriftPort = Utils.getInt(config
+                               .get(Config.NIMBUS_THRIFT_PORT));
+
+               System.out.println("Nimbus host " + nimbusHost);
+               System.out.println("Nimbus thrift port " + nimbusThriftPort);
+
+               System.out.println("uploading jar from " + args[0]);
+               String uploadedJarLocation = StormSubmitter.submitJar(config, 
args[0]);
+               
+               System.out.println("Uploaded jar file location: ");
+               System.out.println(uploadedJarLocation);
+               
+               Properties props = StormSamoaUtils.getProperties();
+               props.setProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY, 
uploadedJarLocation);
+               
+               File f = new 
File("src/main/resources/samoa-storm-cluster.properties");
+               f.createNewFile();
+               
+               OutputStream out = new FileOutputStream(f);
+               props.store(out, "properties file to store uploaded jar 
location from StormJarSubmitter");
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
new file mode 100644
index 0000000..73879f6
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItem.java
@@ -0,0 +1,170 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.AbstractProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.impl.StormStream.InputStreamId;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * ProcessingItem implementation for Storm.
+ * @author Arinto Murdopo
+ *
+ */
+class StormProcessingItem extends AbstractProcessingItem implements 
StormTopologyNode {
+       private final ProcessingItemBolt piBolt;        
+       private BoltDeclarer piBoltDeclarer;
+       
+       //TODO: should we put parallelism hint here? 
+       //imo, parallelism hint only declared when we add this PI in the 
topology
+       //open for dicussion :p
+               
+       StormProcessingItem(Processor processor, int parallelismHint){
+               this(processor, UUID.randomUUID().toString(), parallelismHint);
+       }
+       
+       StormProcessingItem(Processor processor, String friendlyId, int 
parallelismHint){
+               super(processor, parallelismHint);
+               this.piBolt = new ProcessingItemBolt(processor);
+               this.setName(friendlyId);
+       }
+
+       @Override
+       protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+               StormStream stormInputStream = (StormStream) inputStream;
+               InputStreamId inputId = stormInputStream.getInputId();
+               
+               switch(scheme) {
+               case SHUFFLE:
+                       
piBoltDeclarer.shuffleGrouping(inputId.getComponentId(),inputId.getStreamId());
+                       break;
+               case GROUP_BY_KEY:
+                       piBoltDeclarer.fieldsGrouping(
+                                       inputId.getComponentId(), 
+                                       inputId.getStreamId(), 
+                                       new Fields(StormSamoaUtils.KEY_FIELD));
+                       break;
+               case BROADCAST:
+                       piBoltDeclarer.allGrouping(
+                                       inputId.getComponentId(), 
+                                       inputId.getStreamId());
+                       break;
+               }
+               return this;
+       }
+       
+       @Override
+       public void addToTopology(StormTopology topology, int parallelismHint) {
+               if(piBoltDeclarer != null){
+                       //throw exception that one PI only belong to one 
topology
+               }else{          
+                       TopologyBuilder stormBuilder = 
topology.getStormBuilder();
+                       this.piBoltDeclarer = 
stormBuilder.setBolt(this.getName(), 
+                                       this.piBolt, parallelismHint);
+               }       
+       }
+
+       @Override
+       public StormStream createStream() {
+               return piBolt.createStream(this.getName());
+       }
+
+       @Override
+       public String getId() {
+               return this.getName();
+       }
+       
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder(super.toString());
+               sb.insert(0, String.format("id: %s, ", this.getName()));
+               return sb.toString();
+       }
+               
+       private final static class ProcessingItemBolt extends BaseRichBolt{
+               
+               private static final long serialVersionUID = 
-6637673741263199198L;
+               
+               private final Set<StormBoltStream> streams;
+               private final Processor processor;
+               
+               private OutputCollector collector;
+               
+               ProcessingItemBolt(Processor processor){
+                       this.streams = new HashSet<StormBoltStream>();
+                       this.processor = processor;
+               }
+               
+               @Override
+               public void prepare(@SuppressWarnings("rawtypes") Map 
stormConf, TopologyContext context,
+                               OutputCollector collector) {
+                       this.collector = collector;     
+                       //Processor and this class share the same instance of 
stream
+                       for(StormBoltStream stream: streams){
+                               stream.setCollector(this.collector);
+                       }
+                       
+                       this.processor.onCreate(context.getThisTaskId());
+               }
+
+               @Override
+               public void execute(Tuple input) {
+                       Object sentObject = input.getValue(0);
+                       ContentEvent sentEvent = (ContentEvent)sentObject;
+                       processor.process(sentEvent);
+               }
+
+               @Override
+               public void declareOutputFields(OutputFieldsDeclarer declarer) {
+                       for(StormStream stream: streams){
+                               declarer.declareStream(stream.getOutputId(), 
+                                               new 
Fields(StormSamoaUtils.CONTENT_EVENT_FIELD,
+                                                               
StormSamoaUtils.KEY_FIELD));
+                       }
+               }
+               
+               StormStream createStream(String piId){
+                       StormBoltStream stream = new StormBoltStream(piId);
+                       streams.add(stream);
+                       return stream;
+               }
+       }
+}
+
+

Reply via email to