http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/assembly/samoa-s4.xml
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/assembly/samoa-s4.xml 
b/samoa-s4/src/main/assembly/samoa-s4.xml
new file mode 100644
index 0000000..8e5614a
--- /dev/null
+++ b/samoa-s4/src/main/assembly/samoa-s4.xml
@@ -0,0 +1,64 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<assembly
+       
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+       <id>dist</id>
+       <formats>
+               <format>jar</format>
+       </formats>
+       <includeBaseDirectory>false</includeBaseDirectory>
+
+       <fileSets>
+               <!-- SAMOA API artifacts -->
+               <fileSet>
+                       <outputDirectory>lib/</outputDirectory>
+                       <directory>../samoa-api/target/lib/</directory>
+                       <includes>
+                               <include>*</include>
+                       </includes>
+               </fileSet>
+               <fileSet>
+                       <outputDirectory>app/</outputDirectory>
+                       <directory>../samoa-api/target/</directory>
+                       <includes>
+                               <include>samoa-api-*.jar</include>
+                       </includes>
+               </fileSet>
+               
+               <!-- SAMOA S4 artifacts -->
+               <fileSet>
+                       <outputDirectory>app/</outputDirectory>
+                       <directory>target/</directory>
+                       <includes>
+                               <include>samoa-s4-*.jar</include>
+                       </includes>
+               </fileSet>
+               <fileSet>
+                       <outputDirectory>/</outputDirectory>
+                       <directory>target/</directory>
+                       <includes>
+                               <include>lib/*</include>
+                       </includes>
+               </fileSet>
+       </fileSets>
+
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
new file mode 100644
index 0000000..33299ac
--- /dev/null
+++ 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
@@ -0,0 +1,97 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+
+/**
+ * S4 Platform Component Factory
+ * 
+ * @author severien
+ * 
+ */
+public class S4ComponentFactory implements ComponentFactory {
+
+    public static final Logger logger = 
LoggerFactory.getLogger(S4ComponentFactory.class);
+    protected S4DoTask app;
+
+    @Override
+    public ProcessingItem createPi(Processor processor, int paralellism) {
+        S4ProcessingItem processingItem = new S4ProcessingItem(app);
+        // TODO refactor how to set the paralellism level
+        processingItem.setParalellismLevel(paralellism);
+        processingItem.setProcessor(processor);
+
+        return processingItem;
+    }
+
+    @Override
+    public ProcessingItem createPi(Processor processor) {
+        return this.createPi(processor, 1);
+    }
+
+    @Override
+    public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+        // TODO Create source Entry processing item that connects to an 
external stream
+        S4EntranceProcessingItem entrancePi = new 
S4EntranceProcessingItem(entranceProcessor, app);
+        entrancePi.setParallelism(1); // FIXME should not be set to 1 
statically
+        return entrancePi;
+    }
+
+    @Override
+    public Stream createStream(IProcessingItem sourcePi) {
+        S4Stream aStream = new S4Stream(app);
+        return aStream;
+    }
+
+    @Override
+    public Topology createTopology(String topoName) {
+        return new S4Topology(topoName);
+    }
+
+    /**
+     * Initialization method.
+     * 
+     * @param evalTask
+     */
+    public void init(String evalTask) {
+        // Task is initiated in the DoTaskApp
+    }
+
+    /**
+     * Sets S4 application.
+     * 
+     * @param app
+     */
+    public void setApp(S4DoTask app) {
+        this.app = app;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
new file mode 100644
index 0000000..0f474a4
--- /dev/null
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
@@ -0,0 +1,263 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.github.javacliparser.Option;
+import com.github.javacliparser.ClassOption;
+import com.yahoo.labs.samoa.core.Globals;
+import com.yahoo.labs.samoa.tasks.Task;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/*
+ * S4 App that runs samoa Tasks
+ *
+ * */
+
+/**
+ * The Class DoTaskApp.
+ */
+final public class S4DoTask extends App {
+
+    private final Logger logger = LoggerFactory.getLogger(S4DoTask.class);
+    Task task;
+
+    @Inject @Named("evalTask") public String evalTask;
+
+    public S4DoTask() {
+        super();
+    }
+
+    /** The engine. */
+    protected ComponentFactory componentFactory;
+
+    /**
+     * Gets the factory.
+     * 
+     * @return the factory
+     */
+    public ComponentFactory getFactory() {
+        return componentFactory;
+    }
+
+    /**
+     * Sets the factory.
+     * 
+     * @param factory
+     *            the new factory
+     */
+    public void setFactory(ComponentFactory factory) {
+        this.componentFactory = factory;
+    }
+
+    /*
+     * Build the application
+     * 
+     * @see org.apache.s4.core.App#onInit()
+     */
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.App#onInit()
+     */
+    @Override
+    protected void onInit() {
+        logger.info("DoTaskApp onInit");
+        // ConsoleReporters prints S4 metrics
+        // MetricsRegistry mr = new MetricsRegistry();
+        //
+        // CsvReporter.enable(new File(System.getProperty("user.home")
+        // + "/monitor/"), 10, TimeUnit.SECONDS);
+        // ConsoleReporter.enable(10, TimeUnit.SECONDS);
+        try {
+            System.err.println();
+            System.err.println(Globals.getWorkbenchInfoString());
+            System.err.println();
+
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+        S4ComponentFactory factory = new S4ComponentFactory();
+        factory.setApp(this);
+
+        // logger.debug("LC {}", lc);
+
+        // task = TaskProvider.getTask(evalTask);
+
+        // EXAMPLE OPTIONS
+        // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K
+        // 5 -N 0.0)
+        // String[] args = new String[] {evalTask,"-l", "Clustream","-g",
+        // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents",
+        // "-K", "5", "-N", "0.0)"};
+        // String[] args = new String[] { evalTask, "-l", 
"clustream.Clustream",
+        // "-g", "clustream.Clustream", "-i", "100000", "-s",
+        // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" };
+        logger.debug("PARAMETERS {}", evalTask);
+        // params = params.replace(":", " ");
+        List<String> parameters = new ArrayList<String>();
+        // parameters.add(evalTask);
+        try {
+            parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, 
"UTF-8").split(" ")));
+        } catch (UnsupportedEncodingException ex) {
+            ex.printStackTrace();
+        }
+        String[] args = parameters.toArray(new String[0]);
+        Option[] extraOptions = new Option[] {};
+        // build a single string by concatenating cli options
+        StringBuilder cliString = new StringBuilder();
+        for (int i = 0; i < args.length; i++) {
+            cliString.append(" ").append(args[i]);
+        }
+
+        // parse options
+        try {
+            task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, extraOptions);
+            task.setFactory(factory);
+            task.init();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.App#onStart()
+     */
+    @Override
+    protected void onStart() {
+        logger.info("Starting DoTaskApp... App Partition [{}]", 
this.getPartitionId());
+        // <<<<<<< HEAD Task doesn't have start in latest storm-impl
+        // TODO change the way the app starts
+        // if (this.getPartitionId() == 0)
+        S4Topology s4topology = (S4Topology) getTask().getTopology();
+        S4EntranceProcessingItem epi = (S4EntranceProcessingItem) 
s4topology.getEntranceProcessingItem();
+        while (epi.injectNextEvent())
+            // inject events from the EntrancePI
+            ;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.App#onClose()
+     */
+    @Override
+    protected void onClose() {
+        System.out.println("Closing DoTaskApp...");
+
+    }
+
+    /**
+     * Gets the task.
+     * 
+     * @return the task
+     */
+    public Task getTask() {
+        return task;
+    }
+
+    // These methods are protected in App and can not be accessed from outside.
+    // They are
+    // called from parallel classifiers and evaluations. Is there a better way
+    // to do that?
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.App#createPE(java.lang.Class)
+     */
+    @Override
+    public <T extends ProcessingElement> T createPE(Class<T> type) {
+        return super.createPE(type);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.App#createStream(java.lang.String, 
org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[])
+     */
+    @Override
+    public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> 
finder, ProcessingElement... processingElements) {
+        return super.createStream(name, finder, processingElements);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.App#createStream(java.lang.String, 
org.apache.s4.core.ProcessingElement[])
+     */
+    @Override
+    public <T extends Event> Stream<T> createStream(String name, 
ProcessingElement... processingElements) {
+        return super.createStream(name, processingElements);
+    }
+
+    // @com.beust.jcommander.Parameters(separators = "=")
+    // class Parameters {
+    //
+    // @Parameter(names={"-lc","-local"}, description="Local clustering 
method")
+    // private String localClustering;
+    //
+    // @Parameter(names={"-gc","-global"},
+    // description="Global clustering method")
+    // private String globalClustering;
+    //
+    // }
+    //
+    // class ParametersConverter {// implements IStringConverter<String[]> {
+    //
+    //
+    // public String[] convertToArgs(String value) {
+    //
+    // String[] params = value.split(",");
+    // String[] args = new String[params.length*2];
+    // for(int i=0; i<params.length ; i++) {
+    // args[i] = params[i].split("=")[0];
+    // args[i+1] = params[i].split("=")[1];
+    // i++;
+    // }
+    // return args;
+    // }
+    //
+    // }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
new file mode 100644
index 0000000..2b0c595
--- /dev/null
+++ 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
@@ -0,0 +1,120 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+
+// TODO adapt this entrance processing item to connect to external streams so 
the application doesnt need to use an AdapterApp
+
+public class S4EntranceProcessingItem extends ProcessingElement implements 
EntranceProcessingItem {
+
+    private EntranceProcessor entranceProcessor;
+    // private S4DoTask app;
+    private int parallelism;
+    protected Stream outputStream;
+
+    /**
+     * Constructor of an S4 entrance processing item.
+     * 
+     * @param app
+     *            : S4 application
+     */
+    public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App 
app) {
+        super(app);
+        this.entranceProcessor = entranceProcessor;
+        // this.app = (S4DoTask) app;
+        // this.setSingleton(true);
+    }
+
+    public void setParallelism(int parallelism) {
+        this.parallelism = parallelism;
+    }
+
+    public int getParallelism() {
+        return this.parallelism;
+    }
+
+    @Override
+    public EntranceProcessor getProcessor() {
+        return this.entranceProcessor;
+    }
+
+    //
+    // @Override
+    // public void put(Instance inst) {
+    // // do nothing
+    // // may not needed
+    // }
+
+    @Override
+    protected void onCreate() {
+        // was commented
+        if (this.entranceProcessor != null) {
+            // TODO revisit if we need to change it to a clone() call
+            this.entranceProcessor = (EntranceProcessor) 
this.entranceProcessor.newProcessor(this.entranceProcessor);
+            this.entranceProcessor.onCreate(Integer.parseInt(getId()));
+        }
+    }
+
+    @Override
+    protected void onRemove() {
+        // do nothing
+    }
+
+    //
+    // /**
+    // * Sets the entrance processing item processor.
+    // *
+    // * @param processor
+    // */
+    // public void setProcessor(Processor processor) {
+    // this.entranceProcessor = processor;
+    // }
+
+    @Override
+    public void setName(String name) {
+        super.setName(name);
+    }
+
+    @Override
+    public EntranceProcessingItem setOutputStream(Stream stream) {
+        if (this.outputStream != null)
+            throw new IllegalStateException("Output stream for an EntrancePI 
sohuld be initialized only once");
+        this.outputStream = stream;
+        return this;
+    }
+
+    public boolean injectNextEvent() {
+        if (entranceProcessor.hasNext()) {
+            ContentEvent nextEvent = this.entranceProcessor.nextEvent();
+            outputStream.put(nextEvent);
+            return entranceProcessor.hasNext();
+        } else
+            return false;
+        // return !nextEvent.isLastEvent();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
new file mode 100644
index 0000000..8f8ad9f
--- /dev/null
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
@@ -0,0 +1,90 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import net.jcip.annotations.Immutable;
+
+import org.apache.s4.base.Event;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+
+/**
+ * The Class InstanceEvent.
+ */
+@Immutable
+final public class S4Event extends Event {
+
+       private String key;
+       
+       public String getKey() {
+               return key;
+       }
+
+       public void setKey(String key) {
+               this.key = key;
+       }
+
+       /** The content event. */
+       private ContentEvent contentEvent;
+       
+       /**
+        * Instantiates a new instance event.
+        */
+       public S4Event() {
+               // Needed for serialization of kryo
+       }
+
+       /**
+        * Instantiates a new instance event.
+        *
+        * @param contentEvent the content event
+        */
+       public S4Event(ContentEvent contentEvent) {
+               if (contentEvent != null) {
+                       this.contentEvent = contentEvent;
+                       this.key = contentEvent.getKey();
+                       
+               }
+       }
+
+       /**
+        * Gets the content event.
+        *
+        * @return the content event
+        */
+       public ContentEvent getContentEvent() {
+               return contentEvent;
+       }
+
+       /**
+        * Sets the content event.
+        *
+        * @param contentEvent the new content event
+        */
+       public void setContentEvent(ContentEvent contentEvent) {
+               this.contentEvent = contentEvent;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
new file mode 100644
index 0000000..1351159
--- /dev/null
+++ 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
@@ -0,0 +1,188 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+
+/**
+ * S4 Platform platform specific processing item, inherits from S4 
ProcessinElemnt.
+ * 
+ * @author severien
+ *
+ */
+public class S4ProcessingItem extends ProcessingElement implements
+               ProcessingItem {
+
+       public static final Logger logger = LoggerFactory
+                       .getLogger(S4ProcessingItem.class);
+
+       private Processor processor;
+       private int paralellismLevel;
+       private S4DoTask app;
+
+       private static final String NAME="PROCESSING-ITEM-";
+       private static int OBJ_COUNTER=0;
+       
+       /**
+        * Constructor of S4 ProcessingItem.
+        * 
+        * @param app : S4 application
+        */
+       public S4ProcessingItem(App app) {
+               super(app);
+               super.setName(NAME+OBJ_COUNTER);
+               OBJ_COUNTER++;
+               this.app = (S4DoTask) app;
+               this.paralellismLevel = 1;
+       }
+
+       @Override
+       public String getName() {
+               return super.getName();
+       }
+       
+       /**
+        * Gets processing item paralellism level.
+        * 
+        * @return int
+        */
+       public int getParalellismLevel() {
+               return paralellismLevel;
+       }
+
+       /**
+        * Sets processing item paralellism level.
+        * 
+        * @param paralellismLevel
+        */
+       public void setParalellismLevel(int paralellismLevel) {
+               this.paralellismLevel = paralellismLevel;
+       }
+
+       /**
+        * onEvent method.
+        * 
+        * @param event
+        */
+       public void onEvent(S4Event event) {
+               if (processor.process(event.getContentEvent()) == true) {
+                       close();
+               }
+       }
+
+       /**
+        * Sets S4 processing item processor.
+        * 
+        * @param processor
+        */
+       public void setProcessor(Processor processor) {
+               this.processor = processor;
+       }
+
+       // Methods from ProcessingItem
+       @Override
+       public Processor getProcessor() {
+               return processor;
+       }
+
+       /**
+        * KeyFinder sets the keys for a specific event.
+        * 
+        * @return KeyFinder
+        */
+       private KeyFinder<S4Event> getKeyFinder() {
+               KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() {
+                       @Override
+                       public List<String> get(S4Event s4event) {
+                               List<String> results = new ArrayList<String>();
+                               results.add(s4event.getKey());
+                               return results;
+                       }
+               };
+
+               return keyFinder;
+       }
+       
+       
+       @Override
+       public ProcessingItem connectInputAllStream(Stream inputStream) {
+
+               S4Stream stream = (S4Stream) inputStream;
+               stream.setParallelism(this.paralellismLevel);
+               stream.addStream(inputStream.getStreamId(),
+                               getKeyFinder(), this, S4Stream.BROADCAST);
+               return this;
+       }
+
+       
+       @Override
+       public ProcessingItem connectInputKeyStream(Stream inputStream) {
+
+               S4Stream stream = (S4Stream) inputStream;
+               stream.setParallelism(this.paralellismLevel);
+               stream.addStream(inputStream.getStreamId(),
+                               getKeyFinder(), this,S4Stream.GROUP_BY_KEY);
+
+               return this;
+       }
+       
+       @Override
+       public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+               S4Stream stream = (S4Stream) inputStream;
+               stream.setParallelism(this.paralellismLevel);
+               stream.addStream(inputStream.getStreamId(),
+                               getKeyFinder(), this,S4Stream.SHUFFLE);
+
+               return this;
+       }
+
+       // Methods from ProcessingElement
+       @Override
+       protected void onCreate() {
+               logger.debug("PE ID {}", getId());              
+                               if (this.processor != null) {
+                       this.processor = 
this.processor.newProcessor(this.processor);
+                       this.processor.onCreate(Integer.parseInt(getId()));
+               }
+       }
+
+       @Override
+       protected void onRemove() {
+               // do nothing
+       }
+
+       @Override
+       public int getParallelism() {
+               return this.paralellismLevel;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
new file mode 100644
index 0000000..78a3266
--- /dev/null
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
@@ -0,0 +1,185 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.s4.base.KeyFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.AbstractStream;
+
+/**
+ * S4 Platform specific stream.
+ * 
+ * @author severien
+ *
+ */
+public class S4Stream extends AbstractStream {
+
+       public static final int SHUFFLE = 0;
+       public static final int GROUP_BY_KEY = 1;
+       public static final int BROADCAST = 2;
+
+       private static final Logger logger = 
LoggerFactory.getLogger(S4Stream.class);
+
+       private S4DoTask app;
+       private int processingItemParalellism;
+       private int shuffleCounter;
+
+       private static final String NAME = "STREAM-";
+       private static int OBJ_COUNTER = 0;
+       
+       /* The stream list */
+       public List<StreamType> streams;
+
+       public S4Stream(S4DoTask app) {
+               super();
+               this.app = app;
+               this.processingItemParalellism = 1;
+               this.shuffleCounter = 0;
+               this.streams = new ArrayList<StreamType>();
+               this.setStreamId(NAME+OBJ_COUNTER);
+               OBJ_COUNTER++;
+       }
+       
+       public S4Stream(S4DoTask app, S4ProcessingItem pi) {
+               super();
+               this.app = app;
+               this.processingItemParalellism = 1;
+               this.shuffleCounter = 0;
+               this.streams = new ArrayList<StreamType>();
+               this.setStreamId(NAME+OBJ_COUNTER);
+               OBJ_COUNTER++;
+               
+       }
+
+       /**
+        * 
+        * @return
+        */
+       public int getParallelism() {
+               return processingItemParalellism;
+       }
+
+       public void setParallelism(int parallelism) {
+               this.processingItemParalellism = parallelism;
+       }
+
+       public void addStream(String streamID, KeyFinder<S4Event> finder,
+                       S4ProcessingItem pi, int type) {
+               String streamName = streamID +"_"+pi.getName(); 
+               org.apache.s4.core.Stream<S4Event> stream = 
this.app.createStream(
+                               streamName, pi);
+               stream.setName(streamName);
+               logger.debug("Stream name S4Stream {}", streamName);
+               if (finder != null)
+                       stream.setKey(finder);
+               this.streams.add(new StreamType(stream, type));
+
+       }
+
+       @Override
+       public void put(ContentEvent event) {
+
+               for (int i = 0; i < streams.size(); i++) {
+
+                       switch (streams.get(i).getType()) {
+                       case SHUFFLE:
+                               S4Event s4event = new S4Event(event);
+                               
s4event.setStreamId(streams.get(i).getStream().getName());
+                               if(getParallelism() == 1) {
+                                       s4event.setKey("0");
+                               }else {
+                                       
s4event.setKey(Integer.toString(shuffleCounter));
+                               }
+                               streams.get(i).getStream().put(s4event);
+                               shuffleCounter++;
+                                if (shuffleCounter >= (getParallelism())) {
+                                       shuffleCounter = 0;
+                               }
+                               
+                               break;
+
+                       case GROUP_BY_KEY:
+                               S4Event s4event1 = new S4Event(event);
+                               
s4event1.setStreamId(streams.get(i).getStream().getName());
+                               HashCodeBuilder hb = new HashCodeBuilder();
+                               hb.append(event.getKey());
+                               String key = Integer.toString(hb.build() % 
getParallelism());
+                               s4event1.setKey(key);
+                               streams.get(i).getStream().put(s4event1);
+                               break;
+                               
+                       case BROADCAST:
+                               for (int p = 0; p < this.getParallelism(); p++) 
{
+                                       S4Event s4event2 = new S4Event(event);
+                                       
s4event2.setStreamId(streams.get(i).getStream().getName());
+                                       s4event2.setKey(Integer.toString(p));
+                                       
streams.get(i).getStream().put(s4event2);
+                               }
+                               break;
+
+                       default:
+                               break;
+                       }
+
+                       
+               }
+
+       }
+
+       /**
+        * Subclass for definig stream connection type
+        * @author severien
+        *
+        */
+       class StreamType {
+               org.apache.s4.core.Stream<S4Event> stream;
+               int type;
+
+               public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) {
+                       this.stream = s;
+                       this.type = t;
+               }
+
+               public org.apache.s4.core.Stream<S4Event> getStream() {
+                       return stream;
+               }
+
+               public void setStream(org.apache.s4.core.Stream<S4Event> 
stream) {
+                       this.stream = stream;
+               }
+
+               public int getType() {
+                       return type;
+               }
+
+               public void setType(int type) {
+                       this.type = type;
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
new file mode 100644
index 0000000..c7ef92c
--- /dev/null
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
@@ -0,0 +1,146 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.core.util.ParsingUtils;
+import org.apache.s4.deploy.DeploymentUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.tasks.Task;
+import com.yahoo.labs.samoa.topology.ISubmitter;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class S4Submitter implements ISubmitter {
+
+       private static Logger logger = 
LoggerFactory.getLogger(S4Submitter.class);
+
+       @Override
+       public void deployTask(Task task) {
+               // TODO: Get application FROM HTTP server
+               // TODO: Initializa a http server to serve the app package
+               
+               String appURIString = null;
+//             File app = new File(System.getProperty("user.dir")
+//                             + "/src/site/dist/SAMOA-S4-0.1-dist.jar");
+               
+               // TODO: String app url 
http://localhost:8000/SAMOA-S4-0.1-dist.jar
+               try {
+                       URL appURL = new 
URL("http://localhost:8000/SAMOA-S4-0.1.jar";);
+                       appURIString = appURL.toString();
+               } catch (MalformedURLException e1) {
+                       e1.printStackTrace();
+               }
+               
+//             try {
+//                     appURIString = app.toURI().toURL().toString();
+//             } catch (MalformedURLException e) {
+//                     e.printStackTrace();
+//             }
+               if (task == null) {
+                       logger.error("Can't execute since evaluation task is 
not set!");
+                       return;
+               } else {
+                       logger.info("Deploying SAMOA S4 task [{}] from location 
[{}]. ",
+                                       task.getClass().getSimpleName(), 
appURIString);
+               }
+
+               String[] args = { "-c=testCluster2",
+                               "-appClass=" + S4DoTask.class.getName(),
+                               "-appName=" + "samoaApp",
+                               "-p=evalTask=" + 
task.getClass().getSimpleName(),
+                               "-zk=localhost:2181", "-s4r=" + appURIString , 
"-emc=" + SamoaSerializerModule.class.getName()};
+               // "-emc=" + S4MOAModule.class.getName(),
+               // "@" +
+               // Resources.getResource("s4moa.properties").getFile(),
+
+               S4Config s4config = new S4Config();
+               JCommander jc = new JCommander(s4config);
+               jc.parse(args);
+
+               Map<String, String> namedParameters = new HashMap<String, 
String>();
+               for (String parameter : s4config.namedParameters) {
+                       String[] param = parameter.split("=");
+                       namedParameters.put(param[0], param[1]);
+               }
+
+               AppConfig config = new AppConfig.Builder()
+                               
.appClassName(s4config.appClass).appName(s4config.appName)
+                               
.appURI(s4config.appURI).namedParameters(namedParameters)
+                               .build();
+
+               DeploymentUtils.initAppConfig(config, s4config.clusterName, 
true,
+                               s4config.zkString);
+
+               System.out.println("Suposedly deployed on S4");
+       }
+
+       
+       public void initHTTPServer() {
+               
+       }
+       
+       @Parameters(separators = "=")
+       public static class S4Config {
+
+               @Parameter(names = { "-c", "-cluster" }, description = "Cluster 
name", required = true)
+               String clusterName = null;
+
+               @Parameter(names = "-appClass", description = "Main App class", 
required = false)
+               String appClass = null;
+
+               @Parameter(names = "-appName", description = "Application 
name", required = false)
+               String appName = null;
+
+               @Parameter(names = "-s4r", description = "Application URI", 
required = false)
+               String appURI = null;
+
+               @Parameter(names = "-zk", description = "ZooKeeper connection 
string", required = false)
+               String zkString = null;
+
+               @Parameter(names = { "-extraModulesClasses", "-emc" }, 
description = "Comma-separated list of additional configuration modules (they 
will be instantiated through their constructor without arguments).", required = 
false)
+               List<String> extraModules = new ArrayList<String>();
+
+               @Parameter(names = { "-p", "-namedStringParameters" }, 
description = "Comma-separated list of inline configuration "
+                               + "parameters, taking precedence over 
homonymous configuration parameters from configuration files. "
+                               + "Syntax: '-p=name1=value1,name2=value2 '", 
required = false, converter = ParsingUtils.InlineConfigParameterConverter.class)
+               List<String> namedParameters = new ArrayList<String>();
+
+       }
+
+       @Override
+       public void setLocal(boolean bool) {
+               // TODO S4 works the same for local and distributed environments
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
new file mode 100644
index 0000000..6bef0e8
--- /dev/null
+++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
@@ -0,0 +1,61 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.AbstractTopology;
+
+public class S4Topology extends AbstractTopology {
+       
+       // CASEY: it seems evaluationTask is not used. 
+       // Remove it for now
+    
+//     private String _evaluationTask;
+
+//    S4Topology(String topoName, String evalTask) {
+//        super(topoName);
+//    }
+//
+//    S4Topology(String topoName) {
+//        this(topoName, null);
+//    }
+
+//    @Override
+//    public void setEvaluationTask(String evalTask) {
+//        _evaluationTask = evalTask;
+//    }
+//
+//    @Override
+//    public String getEvaluationTask() {
+//        return _evaluationTask;
+//    }
+    
+       S4Topology(String topoName) {
+               super(topoName);
+       }
+       
+    protected EntranceProcessingItem getEntranceProcessingItem() {
+       if (this.getEntranceProcessingItems() == null) return null;
+       if (this.getEntranceProcessingItems().size() < 1) return null;
+       // TODO: support multiple entrance PIs
+       return 
(EntranceProcessingItem)this.getEntranceProcessingItems().toArray()[0];
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
new file mode 100644
index 0000000..4ae2296
--- /dev/null
+++ 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
@@ -0,0 +1,99 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.nio.ByteBuffer;
+
+import org.apache.s4.base.SerializerDeserializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.yahoo.labs.samoa.learners.classifiers.trees.AttributeContentEvent;
+import com.yahoo.labs.samoa.learners.classifiers.trees.ComputeContentEvent;
+
+public class SamoaSerializer implements SerializerDeserializer{
+
+       private ThreadLocal<Kryo> kryoThreadLocal;
+    private ThreadLocal<Output> outputThreadLocal;
+
+    private int initialBufferSize = 2048;
+    private int maxBufferSize = 256 * 1024;
+
+    public void setMaxBufferSize(int maxBufferSize) {
+        this.maxBufferSize = maxBufferSize;
+    }
+
+    /**
+     * 
+     * @param classLoader
+     *            classloader able to handle classes to serialize/deserialize. 
For instance, application-level events
+     *            can only be handled by the application classloader.
+     */
+    @Inject
+    public SamoaSerializer(@Assisted final ClassLoader classLoader) {
+        kryoThreadLocal = new ThreadLocal<Kryo>() {
+
+            @Override
+            protected Kryo initialValue() {
+                Kryo kryo = new Kryo();
+                kryo.setClassLoader(classLoader);
+                kryo.register(AttributeContentEvent.class, new 
AttributeContentEvent.AttributeCEFullPrecSerializer());
+                kryo.register(ComputeContentEvent.class, new 
ComputeContentEvent.ComputeCEFullPrecSerializer());
+                kryo.setRegistrationRequired(false);
+                return kryo;
+            }
+        };
+
+        outputThreadLocal = new ThreadLocal<Output>() {
+            @Override
+            protected Output initialValue() {
+                Output output = new Output(initialBufferSize, maxBufferSize);
+                return output;
+            }
+        };
+
+    }
+
+    @Override
+    public Object deserialize(ByteBuffer rawMessage) {
+        Input input = new Input(rawMessage.array());
+        try {
+            return kryoThreadLocal.get().readClassAndObject(input);
+        } finally {
+            input.close();
+        }
+    }
+
+    @SuppressWarnings("resource")
+    @Override
+    public ByteBuffer serialize(Object message) {
+        Output output = outputThreadLocal.get();
+        try {
+            kryoThreadLocal.get().writeClassAndObject(output, message);
+            return ByteBuffer.wrap(output.toBytes());
+        } finally {
+            output.clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
new file mode 100644
index 0000000..311e449
--- /dev/null
+++ 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
@@ -0,0 +1,35 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.s4.base.SerializerDeserializer;
+
+import com.google.inject.AbstractModule;
+
+public class SamoaSerializerModule extends AbstractModule {
+
+       @Override
+       protected void configure() {
+               bind(SerializerDeserializer.class).to(SamoaSerializer.class);
+               
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-samza/pom.xml b/samoa-samza/pom.xml
new file mode 100644
index 0000000..8fd60a7
--- /dev/null
+++ b/samoa-samza/pom.xml
@@ -0,0 +1,168 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <name>samoa-samza</name>
+    <description>Samza engine for SAMOA</description>
+
+    <artifactId>samoa-samza</artifactId>
+    <parent>
+        <groupId>com.yahoo.labs.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j-log4j12.version}</version>
+        </dependency>
+
+        <!--<dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>11.0.2</version>
+        </dependency> -->
+
+        <dependency>
+               <groupId>org.apache.samza</groupId>
+               <artifactId>samza-api</artifactId>
+               <version>${samza.version}</version>
+       </dependency>
+       
+       <dependency>
+               <groupId>org.apache.samza</groupId>
+               <artifactId>samza-core_2.10</artifactId>
+               <version>${samza.version}</version>
+       </dependency>
+      
+       <dependency>
+               <groupId>org.apache.samza</groupId>
+               <artifactId>samza-serializers_2.10</artifactId>
+               <version>${samza.version}</version>
+       </dependency>
+      
+       <!--<dependency>
+               <groupId>org.apache.samza</groupId>
+               <artifactId>samza-shell</artifactId>
+                       <classifier>dist</classifier>
+               <type>tgz</type>
+               <version>${samza.version}</version>
+       </dependency>-->
+      
+       <dependency>
+               <groupId>org.apache.samza</groupId>
+               <artifactId>samza-yarn_2.10</artifactId>
+               <version>${samza.version}</version>
+       </dependency>
+
+       <dependency>
+                <groupId>org.apache.samza</groupId>
+                <artifactId>samza-kafka_2.10</artifactId>
+                <version>${samza.version}</version>
+        </dependency>
+
+       <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka_2.10</artifactId>
+                <version>${kafka.version}</version>
+        </dependency>
+       <dependency>
+               <groupId>com.101tec</groupId>
+               <artifactId>zkclient</artifactId>
+               <version>0.4</version>
+       </dependency>
+       <dependency>
+               <groupId>commons-io</groupId>
+               <artifactId>commons-io</artifactId>
+               <version>2.1</version>
+       </dependency>
+       <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-common</artifactId>
+               <version>${hadoop.version}</version>
+       </dependency>
+       <dependency>
+               <groupId>org.apache.hadoop</groupId>
+               <artifactId>hadoop-hdfs</artifactId>
+               <version>${hadoop.version}</version> 
+       </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- SAMOA assembly -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven-assembly-plugin.version}</version>
+                <configuration>
+                    
+                    <descriptors>
+                        
<descriptor>src/main/assembly/samoa-samza.xml</descriptor>
+                    </descriptors>
+                    <finalName>SAMOA-Samza-${project.version}</finalName>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                    <outputDirectory>../target</outputDirectory>
+                    <!--
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    -->
+                    <archive>
+                        <manifestEntries>
+                            
<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>
+                            
<Bundle-Description>${project.description}</Bundle-Description>
+                            
<Implementation-Version>${project.version}</Implementation-Version>
+                            <Implementation-Vendor>Yahoo 
Labs</Implementation-Vendor>
+                            
<Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for 
inheritance merges -->
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+               <resources>
+            <resource>
+               <directory>src/main/resources</directory>
+               <filtering>true</filtering>
+            </resource>
+        </resources> 
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/assembly/samoa-samza.xml
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/assembly/samoa-samza.xml 
b/samoa-samza/src/main/assembly/samoa-samza.xml
new file mode 100644
index 0000000..ead1c0a
--- /dev/null
+++ b/samoa-samza/src/main/assembly/samoa-samza.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 - 2014 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+  license agreements. See the NOTICE file distributed with this work for 
additional 
+  information regarding copyright ownership. The ASF licenses this file to 
+  you under the Apache License, Version 2.0 (the "License"); you may not use 
+  this file except in compliance with the License. You may obtain a copy of 
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+  by applicable law or agreed to in writing, software distributed under the 
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+  OF ANY KIND, either express or implied. See the License for the specific 
+  language governing permissions and limitations under the License. -->
+
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+  <id>dist</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/..</directory>
+      <includes>
+        <include>README*</include>
+        <include>LICENSE*</include>
+        <include>NOTICE*</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <outputDirectory>bin</outputDirectory>
+      <directory>${basedir}/../bin/samza-dist</directory>
+      <includes>
+        <include>run-class.sh</include>
+        <include>run-am.sh</include>
+        <include>run-container.sh</include>
+        <include>run-job.sh</include>
+      </includes>
+    </fileSet>
+  </fileSets>
+  <files>
+    <file>
+      <outputDirectory>lib</outputDirectory>
+      <source>${basedir}/src/main/resources/log4j.xml</source>
+    </file>
+  </files>
+  <dependencySets>
+    <!--
+    <dependencySet>
+      <outputDirectory>bin</outputDirectory>
+      <includes>
+        <include>org.apache.samza:samza-shell:tgz:dist:*</include>
+      </includes>
+      <fileMode>0744</fileMode>
+      <unpack>true</unpack>
+    </dependencySet>
+    -->
+    <dependencySet>
+      <unpack>true</unpack>
+      <includes>
+        <include>org.slf4j:slf4j-log4j12</include>
+        <include>org.apache.samza:samza-api</include>
+        <include>org.apache.samza:samza-core_2.10</include>
+        <include>org.apache.samza:samza-serializers_2.10</include>
+        <include>org.apache.samza:samza-yarn_2.10</include>
+        <include>org.apache.samza:samza-kafka_2.10</include>
+        <include>org.apache.kafka:kafka_2.10</include>
+      </includes>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <useTransitiveDependencies>true</useTransitiveDependencies>
+    </dependencySet>
+    <dependencySet>
+      <unpack>true</unpack>
+      <includes>
+        <include>${groupId}:samoa-api</include>
+        <include>${groupId}:${artifactId}</include>
+      </includes>
+      <unpackOptions>
+        <excludes>
+          <exclude>META-INF/services/org.apache.hadoop.security.*</exclude>
+        </excludes>
+      </unpackOptions>
+      <useTransitiveFiltering>true</useTransitiveFiltering>
+      <useTransitiveDependencies>true</useTransitiveDependencies>
+    </dependencySet>
+  </dependencySets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
new file mode 100644
index 0000000..45dd901
--- /dev/null
+++ b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
@@ -0,0 +1,227 @@
+package com.yahoo.labs.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.yahoo.labs.samoa.tasks.Task;
+import com.yahoo.labs.samoa.topology.impl.SamzaComponentFactory;
+import com.yahoo.labs.samoa.topology.impl.SamzaEngine;
+import com.yahoo.labs.samoa.topology.impl.SamzaTopology;
+import com.yahoo.labs.samoa.utils.SystemsUtils;
+
+/**
+ * Main class to run the task on Samza
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaDoTask {
+
+       private static final Logger logger = 
LoggerFactory.getLogger(SamzaDoTask.class);
+       
+       private static final String LOCAL_MODE = "local";
+       private static final String YARN_MODE = "yarn";
+       
+       // FLAGS
+       private static final String YARN_CONF_FLAG = "--yarn_home";
+       private static final String MODE_FLAG = "--mode";
+       private static final String ZK_FLAG = "--zookeeper";
+       private static final String KAFKA_FLAG = "--kafka";
+       private static final String KAFKA_REPLICATION_FLAG = 
"--kafka_replication_factor";
+       private static final String CHECKPOINT_FREQ_FLAG = 
"--checkpoint_frequency";
+       private static final String JAR_PACKAGE_FLAG = "--jar_package";
+       private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir";
+       private static final String AM_MEMORY_FLAG = "--yarn_am_mem";
+       private static final String CONTAINER_MEMORY_FLAG = 
"--yarn_container_mem";
+       private static final String PI_PER_CONTAINER_FLAG = 
"--pi_per_container";
+       
+       private static final String KRYO_REGISTER_FLAG = "--kryo_register";
+       
+       // config values
+       private static int kafkaReplicationFactor = 1;
+       private static int checkpointFrequency = 60000;
+       private static String kafka = "localhost:9092";
+       private static String zookeeper = "localhost:2181";
+       private static boolean isLocal = true;
+       private static String yarnConfHome = null;
+       private static String samoaHDFSDir = null;
+       private static String jarPackagePath = null;
+       private static int amMem = 1024;
+       private static int containerMem = 1024;
+       private static int piPerContainer = 2;
+       private static String kryoRegisterFile = null;
+       
+       /*
+        * 1. Read arguments
+        * 2. Construct topology/task
+        * 3. Upload the JAR to HDFS if we are running on YARN
+        * 4. Submit topology to SamzaEngine
+        */
+       public static void main(String[] args) {
+               // Read arguments
+               List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
+               parseArguments(tmpArgs);
+               
+               args = tmpArgs.toArray(new String[0]);
+               
+               // Init Task
+               StringBuilder cliString = new StringBuilder();
+        for (int i = 0; i < args.length; i++) {
+            cliString.append(" ").append(args[i]);
+        }
+        logger.debug("Command line string = {}", cliString.toString());
+        System.out.println("Command line string = " + cliString.toString());
+        
+               Task task = null;
+        try {
+            task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
+            logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
+        } catch (Exception e) {
+            logger.error("Fail to initialize the task", e);
+            System.out.println("Fail to initialize the task" + e);
+            return;
+        }
+               task.setFactory(new SamzaComponentFactory());
+               task.init();
+               
+               // Upload JAR file to HDFS
+               String hdfsPath = null;
+               if (!isLocal) {
+                       Path path = 
FileSystems.getDefault().getPath(jarPackagePath);
+                       hdfsPath = uploadJarToHDFS(path.toFile());
+                       if(hdfsPath == null) {
+                               System.out.println("Fail uploading JAR file 
\""+path.toAbsolutePath().toString()+"\" to HDFS.");
+                               return;
+                       }
+               }
+               
+               // Set parameters
+               SamzaEngine.getEngine()
+               .setLocalMode(isLocal)
+               .setZooKeeper(zookeeper)
+               .setKafka(kafka)
+               .setYarnPackage(hdfsPath)
+               .setKafkaReplicationFactor(kafkaReplicationFactor)
+               .setConfigHome(yarnConfHome)
+               .setAMMemory(amMem)
+               .setContainerMemory(containerMem)
+               .setPiPerContainerRatio(piPerContainer)
+               .setKryoRegisterFile(kryoRegisterFile)
+               .setCheckpointFrequency(checkpointFrequency);
+               
+               // Submit topology
+               SamzaEngine.submitTopology((SamzaTopology)task.getTopology());
+               
+       }
+       
+       private static boolean isLocalMode(String mode) {
+               return mode.equals(LOCAL_MODE);
+       }
+       
+       private static void parseArguments(List<String> args) {
+               for (int i=args.size()-1; i>=0; i--) {
+                       String arg = args.get(i).trim();
+                       String[] splitted = arg.split("=",2);
+                       
+                       if (splitted.length >= 2) {
+                               // YARN config folder which contains 
conf/core-site.xml,
+                               // conf/hdfs-site.xml, conf/yarn-site.xml
+                               if (splitted[0].equals(YARN_CONF_FLAG)) {
+                                       yarnConfHome = splitted[1];
+                                       args.remove(i);
+                               }
+                               // host:port for zookeeper cluster
+                               else if (splitted[0].equals(ZK_FLAG)) {
+                                       zookeeper = splitted[1];
+                                       args.remove(i);
+                               }
+                               // host:port,... for kafka broker(s)
+                               else if (splitted[0].equals(KAFKA_FLAG)) {
+                                       kafka = splitted[1];
+                                       args.remove(i);
+                               }
+                               // whether we are running Samza in Local mode 
or YARN mode 
+                               else if (splitted[0].equals(MODE_FLAG)) {
+                                       isLocal = isLocalMode(splitted[1]);
+                                       args.remove(i);
+                               }
+                               // memory requirement for YARN application 
master
+                               else if (splitted[0].equals(AM_MEMORY_FLAG)) {
+                                       amMem = Integer.parseInt(splitted[1]);
+                                       args.remove(i);
+                               }
+                               // memory requirement for YARN worker container
+                               else if 
(splitted[0].equals(CONTAINER_MEMORY_FLAG)) {
+                                       containerMem = 
Integer.parseInt(splitted[1]);
+                                       args.remove(i);
+                               }
+                               // the path to JAR archive that we need to 
upload to HDFS
+                               else if (splitted[0].equals(JAR_PACKAGE_FLAG)) {
+                                       jarPackagePath = splitted[1];
+                                       args.remove(i);
+                               }
+                               // the HDFS dir for SAMOA files
+                               else if 
(splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) {
+                                       samoaHDFSDir = splitted[1];
+                                       if (samoaHDFSDir.length() < 1) 
samoaHDFSDir = null;
+                                       args.remove(i);
+                               }
+                               // number of max PI instances per container
+                               // this will be used to compute the number of 
containers 
+                               // AM will request for the job
+                               else if 
(splitted[0].equals(PI_PER_CONTAINER_FLAG)) {
+                                       piPerContainer = 
Integer.parseInt(splitted[1]);
+                                       args.remove(i);
+                               }
+                               // kafka streams replication factor
+                               else if 
(splitted[0].equals(KAFKA_REPLICATION_FLAG)) {
+                                       kafkaReplicationFactor = 
Integer.parseInt(splitted[1]);
+                                       args.remove(i);
+                               }
+                               // checkpoint frequency in ms
+                               else if 
(splitted[0].equals(CHECKPOINT_FREQ_FLAG)) {
+                                       checkpointFrequency = 
Integer.parseInt(splitted[1]);
+                                       args.remove(i);
+                               }
+                               // the file contains registration information 
for Kryo serializer
+                               else if 
(splitted[0].equals(KRYO_REGISTER_FLAG)) {
+                                       kryoRegisterFile = splitted[1];
+                                       args.remove(i);
+                               }
+                       }
+               }
+       }
+       
+       private static String uploadJarToHDFS(File file) {
+               SystemsUtils.setHadoopConfigHome(yarnConfHome);
+               SystemsUtils.setSAMOADir(samoaHDFSDir);
+               return SystemsUtils.copyToHDFS(file, file.getName());
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
new file mode 100644
index 0000000..362e0a5
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
@@ -0,0 +1,57 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+
+import 
com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem.SamoaSystemConsumer;
+
+/**
+ * Implementation of Samza's SystemFactory
+ * Samza will use this factory to get our custom consumer
+ * which gets the events from SAMOA EntranceProcessor
+ * and feed them to EntranceProcessingItem task
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamoaSystemFactory implements SystemFactory {
+       @Override
+       public SystemAdmin getAdmin(String systemName, Config config) {
+               return new SinglePartitionWithoutOffsetsSystemAdmin();
+       }
+
+       @Override
+       public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
+               return new SamoaSystemConsumer(systemName, config);
+       }
+
+       @Override
+       public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
+               throw new SamzaException("This implementation is not supposed 
to produce anything.");
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
new file mode 100644
index 0000000..d71d97b
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
@@ -0,0 +1,62 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+
+/**
+ * Implementation of SAMOA ComponentFactory for Samza
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaComponentFactory implements ComponentFactory {
+       @Override
+       public ProcessingItem createPi(Processor processor) {
+               return this.createPi(processor, 1);
+       }
+
+       @Override
+       public ProcessingItem createPi(Processor processor, int parallelism) {
+               return new SamzaProcessingItem(processor, parallelism);
+       }
+
+       @Override
+       public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+               return new SamzaEntranceProcessingItem(entranceProcessor);
+       }
+       
+       @Override
+       public Stream createStream(IProcessingItem sourcePi) {
+               return new SamzaStream(sourcePi);
+       }
+       
+       @Override
+       public Topology createTopology(String topoName) {
+               return new SamzaTopology(topoName);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
new file mode 100644
index 0000000..7339443
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEngine.java
@@ -0,0 +1,197 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.JobRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream;
+import com.yahoo.labs.samoa.utils.SamzaConfigFactory;
+import com.yahoo.labs.samoa.utils.SystemsUtils;
+
+/**
+ * This class will submit a list of Samza jobs with 
+ * the Configs generated from the input topology
+ * 
+ * @author Anh Thu Vu
+ *
+ */
+public class SamzaEngine {
+       
+       private static final Logger logger = 
LoggerFactory.getLogger(SamzaEngine.class);
+       
+       /*
+        * Singleton instance
+        */
+       private static SamzaEngine engine = new SamzaEngine();
+       
+       private String zookeeper;
+       private String kafka;
+       private int kafkaReplicationFactor;
+       private boolean isLocalMode;
+       private String yarnPackagePath;
+       private String yarnConfHome;
+       
+       private String kryoRegisterFile;
+       
+       private int amMem;
+       private int containerMem;
+       private int piPerContainerRatio;
+       
+       private int checkpointFrequency;
+       
+       private void _submitTopology(SamzaTopology topology) {
+               
+               // Setup SamzaConfigFactory
+               SamzaConfigFactory configFactory = new SamzaConfigFactory();
+               configFactory.setLocalMode(isLocalMode)
+               .setZookeeper(zookeeper)
+               .setKafka(kafka)
+               .setYarnPackage(yarnPackagePath)
+               .setAMMemory(amMem)
+               .setContainerMemory(containerMem)
+               .setPiPerContainerRatio(piPerContainerRatio)
+               .setKryoRegisterFile(kryoRegisterFile)
+               .setCheckpointFrequency(checkpointFrequency)
+               .setReplicationFactor(kafkaReplicationFactor);
+               
+               // Generate the list of Configs
+               List<MapConfig> configs;
+               try {
+                       // ConfigFactory generate a list of configs
+                       // Serialize a map of PIs and store in a file in the 
jar at jarFilePath
+                       // (in dat/ folder)
+                       configs = 
configFactory.getMapConfigsForTopology(topology);
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       return;
+               }
+               
+               // Create kafka streams
+               Set<Stream> streams = topology.getStreams(); 
+               for (Stream stream:streams) {
+                       SamzaStream samzaStream = (SamzaStream) stream;
+                       List<SamzaSystemStream> systemStreams = 
samzaStream.getSystemStreams();
+                       for (SamzaSystemStream systemStream:systemStreams) {
+                               // all streams should be kafka streams
+                               
SystemsUtils.createKafkaTopic(systemStream.getStream(),systemStream.getParallelism(),kafkaReplicationFactor);
+                       }
+               }
+               
+               // Submit the jobs with those configs
+               for (MapConfig config:configs) {
+                       logger.info("Config:{}",config);
+                       JobRunner jobRunner = new JobRunner(config);
+                       jobRunner.run();
+               }
+       }
+
+       private void _setupSystemsUtils() {
+               // Setup Utils
+               if (!isLocalMode)
+                       SystemsUtils.setHadoopConfigHome(yarnConfHome);
+               SystemsUtils.setZookeeper(zookeeper);
+       }
+       
+       /*
+        * Setter methods
+        */
+       public static SamzaEngine getEngine() {
+               return engine;
+       }
+       
+       public SamzaEngine setZooKeeper(String zk) {
+               this.zookeeper = zk;
+               return this;
+       }
+       
+       public SamzaEngine setKafka(String kafka) {
+               this.kafka = kafka;
+               return this;
+       }
+       
+       public SamzaEngine setKafkaReplicationFactor(int replicationFactor) {
+               this.kafkaReplicationFactor = replicationFactor;
+               return this;
+       }
+       
+       public SamzaEngine setCheckpointFrequency(int freq) {
+               this.checkpointFrequency = freq;
+               return this;
+       }
+       
+       public SamzaEngine setLocalMode(boolean isLocal) {
+               this.isLocalMode = isLocal;
+               return this;
+       }
+       
+       public SamzaEngine setYarnPackage(String yarnPackagePath) {
+               this.yarnPackagePath = yarnPackagePath;
+               return this;
+       }
+       
+       public SamzaEngine setConfigHome(String configHome) {
+               this.yarnConfHome = configHome;
+               return this;
+       }
+       
+       public SamzaEngine setAMMemory(int mem) {
+               this.amMem = mem;
+               return this;
+       }
+       
+       public SamzaEngine setContainerMemory(int mem) {
+               this.containerMem = mem;
+               return this;
+       }
+       
+       public SamzaEngine setPiPerContainerRatio(int piPerContainer) {
+               this.piPerContainerRatio = piPerContainer;
+               return this;
+       }
+       
+       public SamzaEngine setKryoRegisterFile(String registerFile) {
+               this.kryoRegisterFile = registerFile;
+               return this;
+       }
+       
+       /**
+        * Submit a list of Samza jobs correspond to the submitted 
+        * topology
+        * 
+        * @param topo
+        *            the submitted topology
+        */
+       public static void submitTopology(SamzaTopology topo) {
+               // Setup SystemsUtils
+               engine._setupSystemsUtils();
+               
+               // Submit topology
+               engine._submitTopology(topo);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
new file mode 100644
index 0000000..e89d789
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaEntranceProcessingItem.java
@@ -0,0 +1,222 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.utils.SamzaConfigFactory;
+import com.yahoo.labs.samoa.utils.SystemsUtils;
+
+/**
+ * EntranceProcessingItem for Samza
+ * which is also a Samza task (StreamTask & InitableTask)
+ * 
+ * @author Anh Thu Vu
+ *
+ */
+public class SamzaEntranceProcessingItem extends AbstractEntranceProcessingItem
+                                         implements SamzaProcessingNode, 
Serializable, StreamTask, InitableTask {
+
+       /**
+        * 
+        */
+       private static final long serialVersionUID = 7157734520046135039L;
+       
+       /*
+        * Constructors
+        */
+       public SamzaEntranceProcessingItem(EntranceProcessor processor) {
+               super(processor);
+       }
+       
+       // Need this so Samza can initialize a StreamTask
+       public SamzaEntranceProcessingItem() {} 
+       
+       /*
+        * Simple setters, getters
+        */
+       @Override
+       public int addOutputStream(SamzaStream stream) {
+               this.setOutputStream(stream);
+               return 1; // entrance PI should have only 1 output stream
+       }
+       
+       /*
+        * Serialization
+        */
+       private Object writeReplace() {
+               return new SerializationProxy(this);
+       }
+       
+       private static class SerializationProxy implements Serializable {
+               /**
+                * 
+                */
+               private static final long serialVersionUID = 
313907132721414634L;
+               
+               private EntranceProcessor processor;
+               private SamzaStream outputStream;
+               private String name;
+               
+               public SerializationProxy(SamzaEntranceProcessingItem epi) {
+                       this.processor = epi.getProcessor();
+                       this.outputStream = (SamzaStream)epi.getOutputStream();
+                       this.name = epi.getName();
+               }
+       }
+       
+       /*
+        * Implement Samza Task
+        */
+       @Override
+       public void init(Config config, TaskContext context) throws Exception {
+               String yarnConfHome = 
config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+               if (yarnConfHome != null && yarnConfHome.length() > 0) // if 
the property is set , otherwise, assume we are running in
+                                                                               
                        // local mode and ignore this
+                       SystemsUtils.setHadoopConfigHome(yarnConfHome);
+               
+               String filename = config.get(SamzaConfigFactory.FILE_KEY);
+               String filesystem = 
config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+               
+               this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
+               SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, 
this.getName());
+               this.setOutputStream(wrapper.outputStream);
+               SamzaStream output = (SamzaStream)this.getOutputStream();
+               if (output != null) // if output stream exists, set it up
+                       output.onCreate();
+       }
+
+       @Override
+       public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) throws Exception {
+               SamzaStream output = (SamzaStream)this.getOutputStream();
+               if (output == null) return; // if there is no output stream, do 
nothing
+               output.setCollector(collector);
+               ContentEvent event = (ContentEvent) envelope.getMessage();
+               output.put(event);
+       }
+       
+       /*
+        * Implementation of Samza's SystemConsumer to get events from source
+        * and feed to SAMOA system
+        * 
+        */
+       /* Current implementation: buffer the incoming events and send a batch 
+        * of them when poll() is called by Samza system.
+        * 
+        * Currently: it has a "soft" limit on the size of the buffer:
+        * when the buffer size reaches the limit, the reading thread will sleep
+        * for 100ms.
+        * A hard limit can be achieved by overriding the method
+        * protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue()
+        * of BlockingEnvelopeMap
+        * But then we have handle the case when the queue is full.
+        * 
+        */
+       public static class SamoaSystemConsumer extends BlockingEnvelopeMap {
+               
+               private EntranceProcessor entranceProcessor = null;
+               private SystemStreamPartition systemStreamPartition;
+               
+               private static final Logger logger = 
LoggerFactory.getLogger(SamoaSystemConsumer.class);
+
+               public SamoaSystemConsumer(String systemName, Config config) {
+                       String yarnConfHome = 
config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+                       if (yarnConfHome != null && yarnConfHome.length() > 0) 
// if the property is set , otherwise, assume we are running in
+                                                                           // 
local mode and ignore this
+                               SystemsUtils.setHadoopConfigHome(yarnConfHome);
+                       
+                       String filename = 
config.get(SamzaConfigFactory.FILE_KEY);
+                       String filesystem = 
config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+                       String name = 
config.get(SamzaConfigFactory.JOB_NAME_KEY);
+                       SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, name);
+                       
+                       this.entranceProcessor = wrapper.processor;
+                       this.entranceProcessor.onCreate(0);
+                       
+                       // Internal stream from SystemConsumer to EntranceTask, 
so we
+                       // need only one partition
+                       this.systemStreamPartition = new 
SystemStreamPartition(systemName, wrapper.name, new Partition(0));
+               }
+               
+               @Override
+               public void start() {
+                       Thread processorPollingThread = new Thread(
+                       new Runnable() {
+                           @Override
+                           public void run() {
+                               try {
+                                   pollingEntranceProcessor();
+                                   setIsAtHead(systemStreamPartition, true);
+                               } catch (InterruptedException e) {
+                                   e.getStackTrace();
+                                   stop();
+                               }
+                           }
+                       }
+               );
+
+               processorPollingThread.start();
+               }
+
+               @Override
+               public void stop() {
+
+               }
+               
+               private void pollingEntranceProcessor() throws 
InterruptedException {
+                       int messageCnt = 0;
+                       while(!this.entranceProcessor.isFinished()) {
+                               messageCnt = 
this.getNumMessagesInQueue(systemStreamPartition);
+                               if (this.entranceProcessor.hasNext() && 
messageCnt < 10000) { // soft limit on the size of the queue
+                                       this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition,null, 
null,this.entranceProcessor.nextEvent()));
+                               } else {
+                                       try {
+                                               Thread.sleep(100);
+                                       } catch (InterruptedException e) {
+                                               break;
+                                       }
+                               }
+                       }
+                       
+                       // Send last event
+                       this.put(systemStreamPartition, new 
IncomingMessageEnvelope(systemStreamPartition,null, 
null,this.entranceProcessor.nextEvent()));
+               }
+               
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
new file mode 100644
index 0000000..db72e7c
--- /dev/null
+++ 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaProcessingItem.java
@@ -0,0 +1,165 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.AbstractProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.impl.SamzaStream.SamzaSystemStream;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.SamzaConfigFactory;
+import com.yahoo.labs.samoa.utils.SystemsUtils;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+/**
+ * ProcessingItem for Samza
+ * which is also a Samza task (StreamTask and InitableTask)
+ * 
+ * @author Anh Thu Vu
+ */
+public class SamzaProcessingItem extends AbstractProcessingItem 
+                                 implements SamzaProcessingNode, Serializable, 
StreamTask, InitableTask {
+       
+       /**
+        * 
+        */
+       private static final long serialVersionUID = 1L;
+
+       private Set<SamzaSystemStream> inputStreams; // input streams: 
system.stream
+       private List<SamzaStream> outputStreams;
+       
+       /*
+        * Constructors
+        */
+       // Need this so Samza can initialize a StreamTask
+       public SamzaProcessingItem() {}
+       
+       /* 
+        * Implement com.yahoo.labs.samoa.topology.ProcessingItem
+        */
+       public SamzaProcessingItem(Processor processor, int parallelismHint) {
+               super(processor, parallelismHint);
+               this.inputStreams = new HashSet<SamzaSystemStream>();
+               this.outputStreams = new LinkedList<SamzaStream>();
+       }
+       
+       /*
+        * Simple setters, getters
+        */
+       public Set<SamzaSystemStream> getInputStreams() {
+               return this.inputStreams;
+       }
+       
+       /*
+        * Extends AbstractProcessingItem
+        */
+       @Override
+       protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+               SamzaSystemStream stream = ((SamzaStream) 
inputStream).addDestination(new 
StreamDestination(this,this.getParallelism(),scheme));
+               this.inputStreams.add(stream);
+               return this;
+       }
+
+       /*
+        * Implement com.yahoo.samoa.topology.impl.SamzaProcessingNode
+        */
+       @Override
+       public int addOutputStream(SamzaStream stream) {
+               this.outputStreams.add(stream);
+               return this.outputStreams.size();
+       }
+       
+       public List<SamzaStream> getOutputStreams() {
+               return this.outputStreams;
+       }
+
+       /*
+        * Implement Samza task
+        */
+       @Override
+       public void init(Config config, TaskContext context) throws Exception {
+               String yarnConfHome = 
config.get(SamzaConfigFactory.YARN_CONF_HOME_KEY);
+               if (yarnConfHome != null && yarnConfHome.length() > 0) // if 
the property is set , otherwise, assume we are running in
+                                                                               
                                // local mode and ignore this
+                       SystemsUtils.setHadoopConfigHome(yarnConfHome);
+               
+               String filename = config.get(SamzaConfigFactory.FILE_KEY);
+               String filesystem = 
config.get(SamzaConfigFactory.FILESYSTEM_KEY);
+               this.setName(config.get(SamzaConfigFactory.JOB_NAME_KEY));
+               SerializationProxy wrapper = (SerializationProxy) 
SystemsUtils.deserializeObjectFromFileAndKey(filesystem, filename, 
this.getName());
+               this.setProcessor(wrapper.processor);
+               this.outputStreams = wrapper.outputStreams;
+               
+               // Init Processor and Streams
+               this.getProcessor().onCreate(0);
+               for (SamzaStream stream:this.outputStreams) {
+                       stream.onCreate();
+               }
+               
+       }
+
+       @Override
+       public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) throws Exception {
+               for (SamzaStream stream:this.outputStreams) {
+                       stream.setCollector(collector);
+               }
+               this.getProcessor().process((ContentEvent) 
envelope.getMessage());
+       }
+       
+       /*
+        * SerializationProxy
+        */
+       private Object writeReplace() {
+               return new SerializationProxy(this);
+       }
+       
+       private static class SerializationProxy implements Serializable {
+               /**
+                * 
+                */
+               private static final long serialVersionUID = 
1534643987559070336L;
+               
+               private Processor processor;
+               private List<SamzaStream> outputStreams;
+               
+               public SerializationProxy(SamzaProcessingItem pi) {
+                       this.processor = pi.getProcessor();
+                       this.outputStreams = pi.getOutputStreams();
+               }
+       }
+
+}
\ No newline at end of file

Reply via email to