http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
deleted file mode 100644
index e1d4754..0000000
--- 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-
-// TODO adapt this entrance processing item to connect to external streams so 
the application doesnt need to use an AdapterApp
-
-public class S4EntranceProcessingItem extends ProcessingElement implements 
EntranceProcessingItem {
-
-  private EntranceProcessor entranceProcessor;
-  // private S4DoTask app;
-  private int parallelism;
-  protected Stream outputStream;
-
-  /**
-   * Constructor of an S4 entrance processing item.
-   * 
-   * @param app
-   *          : S4 application
-   */
-  public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App 
app) {
-    super(app);
-    this.entranceProcessor = entranceProcessor;
-    // this.app = (S4DoTask) app;
-    // this.setSingleton(true);
-  }
-
-  public void setParallelism(int parallelism) {
-    this.parallelism = parallelism;
-  }
-
-  public int getParallelism() {
-    return this.parallelism;
-  }
-
-  @Override
-  public EntranceProcessor getProcessor() {
-    return this.entranceProcessor;
-  }
-
-  //
-  // @Override
-  // public void put(Instance inst) {
-  // // do nothing
-  // // may not needed
-  // }
-
-  @Override
-  protected void onCreate() {
-    // was commented
-    if (this.entranceProcessor != null) {
-      // TODO revisit if we need to change it to a clone() call
-      this.entranceProcessor = (EntranceProcessor) 
this.entranceProcessor.newProcessor(this.entranceProcessor);
-      this.entranceProcessor.onCreate(Integer.parseInt(getId()));
-    }
-  }
-
-  @Override
-  protected void onRemove() {
-    // do nothing
-  }
-
-  //
-  // /**
-  // * Sets the entrance processing item processor.
-  // *
-  // * @param processor
-  // */
-  // public void setProcessor(Processor processor) {
-  // this.entranceProcessor = processor;
-  // }
-
-  @Override
-  public void setName(String name) {
-    super.setName(name);
-  }
-
-  @Override
-  public EntranceProcessingItem setOutputStream(Stream stream) {
-    if (this.outputStream != null)
-      throw new IllegalStateException("Output stream for an EntrancePI sohuld 
be initialized only once");
-    this.outputStream = stream;
-    return this;
-  }
-
-  public boolean injectNextEvent() {
-    if (entranceProcessor.hasNext()) {
-      ContentEvent nextEvent = this.entranceProcessor.nextEvent();
-      outputStream.put(nextEvent);
-      return entranceProcessor.hasNext();
-    } else
-      return false;
-    // return !nextEvent.isLastEvent();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
deleted file mode 100644
index 8ef137c..0000000
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * License
- */
-
-import net.jcip.annotations.Immutable;
-
-import org.apache.s4.base.Event;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-
-/**
- * The Class InstanceEvent.
- */
-@Immutable
-final public class S4Event extends Event {
-
-  private String key;
-
-  public String getKey() {
-    return key;
-  }
-
-  public void setKey(String key) {
-    this.key = key;
-  }
-
-  /** The content event. */
-  private ContentEvent contentEvent;
-
-  /**
-   * Instantiates a new instance event.
-   */
-  public S4Event() {
-    // Needed for serialization of kryo
-  }
-
-  /**
-   * Instantiates a new instance event.
-   * 
-   * @param contentEvent
-   *          the content event
-   */
-  public S4Event(ContentEvent contentEvent) {
-    if (contentEvent != null) {
-      this.contentEvent = contentEvent;
-      this.key = contentEvent.getKey();
-
-    }
-  }
-
-  /**
-   * Gets the content event.
-   * 
-   * @return the content event
-   */
-  public ContentEvent getContentEvent() {
-    return contentEvent;
-  }
-
-  /**
-   * Sets the content event.
-   * 
-   * @param contentEvent
-   *          the new content event
-   */
-  public void setContentEvent(ContentEvent contentEvent) {
-    this.contentEvent = contentEvent;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
deleted file mode 100644
index ea88094..0000000
--- 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ProcessingItem.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-
-/**
- * S4 Platform platform specific processing item, inherits from S4 
ProcessinElemnt.
- * 
- * @author severien
- * 
- */
-public class S4ProcessingItem extends ProcessingElement implements
-    ProcessingItem {
-
-  public static final Logger logger = LoggerFactory
-      .getLogger(S4ProcessingItem.class);
-
-  private Processor processor;
-  private int paralellismLevel;
-  private S4DoTask app;
-
-  private static final String NAME = "PROCESSING-ITEM-";
-  private static int OBJ_COUNTER = 0;
-
-  /**
-   * Constructor of S4 ProcessingItem.
-   * 
-   * @param app
-   *          : S4 application
-   */
-  public S4ProcessingItem(App app) {
-    super(app);
-    super.setName(NAME + OBJ_COUNTER);
-    OBJ_COUNTER++;
-    this.app = (S4DoTask) app;
-    this.paralellismLevel = 1;
-  }
-
-  @Override
-  public String getName() {
-    return super.getName();
-  }
-
-  /**
-   * Gets processing item paralellism level.
-   * 
-   * @return int
-   */
-  public int getParalellismLevel() {
-    return paralellismLevel;
-  }
-
-  /**
-   * Sets processing item paralellism level.
-   * 
-   * @param paralellismLevel
-   */
-  public void setParalellismLevel(int paralellismLevel) {
-    this.paralellismLevel = paralellismLevel;
-  }
-
-  /**
-   * onEvent method.
-   * 
-   * @param event
-   */
-  public void onEvent(S4Event event) {
-    if (processor.process(event.getContentEvent()) == true) {
-      close();
-    }
-  }
-
-  /**
-   * Sets S4 processing item processor.
-   * 
-   * @param processor
-   */
-  public void setProcessor(Processor processor) {
-    this.processor = processor;
-  }
-
-  // Methods from ProcessingItem
-  @Override
-  public Processor getProcessor() {
-    return processor;
-  }
-
-  /**
-   * KeyFinder sets the keys for a specific event.
-   * 
-   * @return KeyFinder
-   */
-  private KeyFinder<S4Event> getKeyFinder() {
-    KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() {
-      @Override
-      public List<String> get(S4Event s4event) {
-        List<String> results = new ArrayList<String>();
-        results.add(s4event.getKey());
-        return results;
-      }
-    };
-
-    return keyFinder;
-  }
-
-  @Override
-  public ProcessingItem connectInputAllStream(Stream inputStream) {
-
-    S4Stream stream = (S4Stream) inputStream;
-    stream.setParallelism(this.paralellismLevel);
-    stream.addStream(inputStream.getStreamId(),
-        getKeyFinder(), this, S4Stream.BROADCAST);
-    return this;
-  }
-
-  @Override
-  public ProcessingItem connectInputKeyStream(Stream inputStream) {
-
-    S4Stream stream = (S4Stream) inputStream;
-    stream.setParallelism(this.paralellismLevel);
-    stream.addStream(inputStream.getStreamId(),
-        getKeyFinder(), this, S4Stream.GROUP_BY_KEY);
-
-    return this;
-  }
-
-  @Override
-  public ProcessingItem connectInputShuffleStream(Stream inputStream) {
-    S4Stream stream = (S4Stream) inputStream;
-    stream.setParallelism(this.paralellismLevel);
-    stream.addStream(inputStream.getStreamId(),
-        getKeyFinder(), this, S4Stream.SHUFFLE);
-
-    return this;
-  }
-
-  // Methods from ProcessingElement
-  @Override
-  protected void onCreate() {
-    logger.debug("PE ID {}", getId());
-    if (this.processor != null) {
-      this.processor = this.processor.newProcessor(this.processor);
-      this.processor.onCreate(Integer.parseInt(getId()));
-    }
-  }
-
-  @Override
-  protected void onRemove() {
-    // do nothing
-  }
-
-  @Override
-  public int getParallelism() {
-    return this.paralellismLevel;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
deleted file mode 100644
index 46c979c..0000000
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Stream.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.s4.base.KeyFinder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.topology.AbstractStream;
-
-/**
- * S4 Platform specific stream.
- * 
- * @author severien
- * 
- */
-public class S4Stream extends AbstractStream {
-
-  public static final int SHUFFLE = 0;
-  public static final int GROUP_BY_KEY = 1;
-  public static final int BROADCAST = 2;
-
-  private static final Logger logger = LoggerFactory.getLogger(S4Stream.class);
-
-  private S4DoTask app;
-  private int processingItemParalellism;
-  private int shuffleCounter;
-
-  private static final String NAME = "STREAM-";
-  private static int OBJ_COUNTER = 0;
-
-  /* The stream list */
-  public List<StreamType> streams;
-
-  public S4Stream(S4DoTask app) {
-    super();
-    this.app = app;
-    this.processingItemParalellism = 1;
-    this.shuffleCounter = 0;
-    this.streams = new ArrayList<StreamType>();
-    this.setStreamId(NAME + OBJ_COUNTER);
-    OBJ_COUNTER++;
-  }
-
-  public S4Stream(S4DoTask app, S4ProcessingItem pi) {
-    super();
-    this.app = app;
-    this.processingItemParalellism = 1;
-    this.shuffleCounter = 0;
-    this.streams = new ArrayList<StreamType>();
-    this.setStreamId(NAME + OBJ_COUNTER);
-    OBJ_COUNTER++;
-
-  }
-
-  /**
-   * 
-   * @return
-   */
-  public int getParallelism() {
-    return processingItemParalellism;
-  }
-
-  public void setParallelism(int parallelism) {
-    this.processingItemParalellism = parallelism;
-  }
-
-  public void addStream(String streamID, KeyFinder<S4Event> finder,
-      S4ProcessingItem pi, int type) {
-    String streamName = streamID + "_" + pi.getName();
-    org.apache.s4.core.Stream<S4Event> stream = this.app.createStream(
-        streamName, pi);
-    stream.setName(streamName);
-    logger.debug("Stream name S4Stream {}", streamName);
-    if (finder != null)
-      stream.setKey(finder);
-    this.streams.add(new StreamType(stream, type));
-
-  }
-
-  @Override
-  public void put(ContentEvent event) {
-
-    for (int i = 0; i < streams.size(); i++) {
-
-      switch (streams.get(i).getType()) {
-      case SHUFFLE:
-        S4Event s4event = new S4Event(event);
-        s4event.setStreamId(streams.get(i).getStream().getName());
-        if (getParallelism() == 1) {
-          s4event.setKey("0");
-        } else {
-          s4event.setKey(Integer.toString(shuffleCounter));
-        }
-        streams.get(i).getStream().put(s4event);
-        shuffleCounter++;
-        if (shuffleCounter >= (getParallelism())) {
-          shuffleCounter = 0;
-        }
-
-        break;
-
-      case GROUP_BY_KEY:
-        S4Event s4event1 = new S4Event(event);
-        s4event1.setStreamId(streams.get(i).getStream().getName());
-        HashCodeBuilder hb = new HashCodeBuilder();
-        hb.append(event.getKey());
-        String key = Integer.toString(hb.build() % getParallelism());
-        s4event1.setKey(key);
-        streams.get(i).getStream().put(s4event1);
-        break;
-
-      case BROADCAST:
-        for (int p = 0; p < this.getParallelism(); p++) {
-          S4Event s4event2 = new S4Event(event);
-          s4event2.setStreamId(streams.get(i).getStream().getName());
-          s4event2.setKey(Integer.toString(p));
-          streams.get(i).getStream().put(s4event2);
-        }
-        break;
-
-      default:
-        break;
-      }
-
-    }
-
-  }
-
-  /**
-   * Subclass for definig stream connection type
-   * 
-   * @author severien
-   * 
-   */
-  class StreamType {
-    org.apache.s4.core.Stream<S4Event> stream;
-    int type;
-
-    public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) {
-      this.stream = s;
-      this.type = t;
-    }
-
-    public org.apache.s4.core.Stream<S4Event> getStream() {
-      return stream;
-    }
-
-    public void setStream(org.apache.s4.core.Stream<S4Event> stream) {
-      this.stream = stream;
-    }
-
-    public int getType() {
-      return type;
-    }
-
-    public void setType(int type) {
-      this.type = type;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
deleted file mode 100644
index f6bca87..0000000
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Submitter.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.File;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.s4.core.util.AppConfig;
-import org.apache.s4.core.util.ParsingUtils;
-import org.apache.s4.deploy.DeploymentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.tasks.Task;
-import com.yahoo.labs.samoa.topology.ISubmitter;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-
-public class S4Submitter implements ISubmitter {
-
-  private static Logger logger = LoggerFactory.getLogger(S4Submitter.class);
-
-  @Override
-  public void deployTask(Task task) {
-    // TODO: Get application FROM HTTP server
-    // TODO: Initializa a http server to serve the app package
-
-    String appURIString = null;
-    // File app = new File(System.getProperty("user.dir")
-    // + "/src/site/dist/SAMOA-S4-0.1-dist.jar");
-
-    // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar
-    try {
-      URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar";);
-      appURIString = appURL.toString();
-    } catch (MalformedURLException e1) {
-      e1.printStackTrace();
-    }
-
-    // try {
-    // appURIString = app.toURI().toURL().toString();
-    // } catch (MalformedURLException e) {
-    // e.printStackTrace();
-    // }
-    if (task == null) {
-      logger.error("Can't execute since evaluation task is not set!");
-      return;
-    } else {
-      logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ",
-          task.getClass().getSimpleName(), appURIString);
-    }
-
-    String[] args = { "-c=testCluster2",
-        "-appClass=" + S4DoTask.class.getName(),
-        "-appName=" + "samoaApp",
-        "-p=evalTask=" + task.getClass().getSimpleName(),
-        "-zk=localhost:2181", "-s4r=" + appURIString, "-emc=" + 
SamoaSerializerModule.class.getName() };
-    // "-emc=" + S4MOAModule.class.getName(),
-    // "@" +
-    // Resources.getResource("s4moa.properties").getFile(),
-
-    S4Config s4config = new S4Config();
-    JCommander jc = new JCommander(s4config);
-    jc.parse(args);
-
-    Map<String, String> namedParameters = new HashMap<String, String>();
-    for (String parameter : s4config.namedParameters) {
-      String[] param = parameter.split("=");
-      namedParameters.put(param[0], param[1]);
-    }
-
-    AppConfig config = new AppConfig.Builder()
-        .appClassName(s4config.appClass).appName(s4config.appName)
-        .appURI(s4config.appURI).namedParameters(namedParameters)
-        .build();
-
-    DeploymentUtils.initAppConfig(config, s4config.clusterName, true,
-        s4config.zkString);
-
-    System.out.println("Suposedly deployed on S4");
-  }
-
-  public void initHTTPServer() {
-
-  }
-
-  @Parameters(separators = "=")
-  public static class S4Config {
-
-    @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", 
required = true)
-    String clusterName = null;
-
-    @Parameter(names = "-appClass", description = "Main App class", required = 
false)
-    String appClass = null;
-
-    @Parameter(names = "-appName", description = "Application name", required 
= false)
-    String appName = null;
-
-    @Parameter(names = "-s4r", description = "Application URI", required = 
false)
-    String appURI = null;
-
-    @Parameter(names = "-zk", description = "ZooKeeper connection string", 
required = false)
-    String zkString = null;
-
-    @Parameter(names = { "-extraModulesClasses", "-emc" }, description = 
"Comma-separated list of additional configuration modules (they will be 
instantiated through their constructor without arguments).", required = false)
-    List<String> extraModules = new ArrayList<String>();
-
-    @Parameter(names = { "-p", "-namedStringParameters" }, description = 
"Comma-separated list of inline configuration "
-        + "parameters, taking precedence over homonymous configuration 
parameters from configuration files. "
-        + "Syntax: '-p=name1=value1,name2=value2 '", required = false, 
converter = ParsingUtils.InlineConfigParameterConverter.class)
-    List<String> namedParameters = new ArrayList<String>();
-
-  }
-
-  @Override
-  public void setLocal(boolean bool) {
-    // TODO S4 works the same for local and distributed environments
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
deleted file mode 100644
index 5eb0ccf..0000000
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Topology.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.AbstractTopology;
-
-public class S4Topology extends AbstractTopology {
-
-  // CASEY: it seems evaluationTask is not used.
-  // Remove it for now
-
-  // private String _evaluationTask;
-
-  // S4Topology(String topoName, String evalTask) {
-  // super(topoName);
-  // }
-  //
-  // S4Topology(String topoName) {
-  // this(topoName, null);
-  // }
-
-  // @Override
-  // public void setEvaluationTask(String evalTask) {
-  // _evaluationTask = evalTask;
-  // }
-  //
-  // @Override
-  // public String getEvaluationTask() {
-  // return _evaluationTask;
-  // }
-
-  S4Topology(String topoName) {
-    super(topoName);
-  }
-
-  protected EntranceProcessingItem getEntranceProcessingItem() {
-    if (this.getEntranceProcessingItems() == null)
-      return null;
-    if (this.getEntranceProcessingItems().size() < 1)
-      return null;
-    // TODO: support multiple entrance PIs
-    return (EntranceProcessingItem) 
this.getEntranceProcessingItems().toArray()[0];
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
deleted file mode 100644
index b809284..0000000
--- 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializer.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.nio.ByteBuffer;
-
-import org.apache.s4.base.SerializerDeserializer;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.yahoo.labs.samoa.learners.classifiers.trees.AttributeContentEvent;
-import com.yahoo.labs.samoa.learners.classifiers.trees.ComputeContentEvent;
-
-public class SamoaSerializer implements SerializerDeserializer {
-
-  private ThreadLocal<Kryo> kryoThreadLocal;
-  private ThreadLocal<Output> outputThreadLocal;
-
-  private int initialBufferSize = 2048;
-  private int maxBufferSize = 256 * 1024;
-
-  public void setMaxBufferSize(int maxBufferSize) {
-    this.maxBufferSize = maxBufferSize;
-  }
-
-  /**
-   * 
-   * @param classLoader
-   *          classloader able to handle classes to serialize/deserialize. For 
instance, application-level events can
-   *          only be handled by the application classloader.
-   */
-  @Inject
-  public SamoaSerializer(@Assisted final ClassLoader classLoader) {
-    kryoThreadLocal = new ThreadLocal<Kryo>() {
-
-      @Override
-      protected Kryo initialValue() {
-        Kryo kryo = new Kryo();
-        kryo.setClassLoader(classLoader);
-        kryo.register(AttributeContentEvent.class, new 
AttributeContentEvent.AttributeCEFullPrecSerializer());
-        kryo.register(ComputeContentEvent.class, new 
ComputeContentEvent.ComputeCEFullPrecSerializer());
-        kryo.setRegistrationRequired(false);
-        return kryo;
-      }
-    };
-
-    outputThreadLocal = new ThreadLocal<Output>() {
-      @Override
-      protected Output initialValue() {
-        Output output = new Output(initialBufferSize, maxBufferSize);
-        return output;
-      }
-    };
-
-  }
-
-  @Override
-  public Object deserialize(ByteBuffer rawMessage) {
-    Input input = new Input(rawMessage.array());
-    try {
-      return kryoThreadLocal.get().readClassAndObject(input);
-    } finally {
-      input.close();
-    }
-  }
-
-  @SuppressWarnings("resource")
-  @Override
-  public ByteBuffer serialize(Object message) {
-    Output output = outputThreadLocal.get();
-    try {
-      kryoThreadLocal.get().writeClassAndObject(output, message);
-      return ByteBuffer.wrap(output.toBytes());
-    } finally {
-      output.clear();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
deleted file mode 100644
index d47b143..0000000
--- 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSerializerModule.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.s4.base.SerializerDeserializer;
-
-import com.google.inject.AbstractModule;
-
-public class SamoaSerializerModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    bind(SerializerDeserializer.class).to(SamoaSerializer.class);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java
new file mode 100644
index 0000000..ebd18f9
--- /dev/null
+++ 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java
@@ -0,0 +1,97 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * S4 Platform Component Factory
+ * 
+ * @author severien
+ * 
+ */
+public class S4ComponentFactory implements ComponentFactory {
+
+  public static final Logger logger = 
LoggerFactory.getLogger(S4ComponentFactory.class);
+  protected S4DoTask app;
+
+  @Override
+  public ProcessingItem createPi(Processor processor, int paralellism) {
+    S4ProcessingItem processingItem = new S4ProcessingItem(app);
+    // TODO refactor how to set the paralellism level
+    processingItem.setParalellismLevel(paralellism);
+    processingItem.setProcessor(processor);
+
+    return processingItem;
+  }
+
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
+
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+    // TODO Create source Entry processing item that connects to an external
+    // stream
+    S4EntranceProcessingItem entrancePi = new 
S4EntranceProcessingItem(entranceProcessor, app);
+    entrancePi.setParallelism(1); // FIXME should not be set to 1 statically
+    return entrancePi;
+  }
+
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    S4Stream aStream = new S4Stream(app);
+    return aStream;
+  }
+
+  @Override
+  public Topology createTopology(String topoName) {
+    return new S4Topology(topoName);
+  }
+
+  /**
+   * Initialization method.
+   * 
+   * @param evalTask
+   */
+  public void init(String evalTask) {
+    // Task is initiated in the DoTaskApp
+  }
+
+  /**
+   * Sets S4 application.
+   * 
+   * @param app
+   */
+  public void setApp(S4DoTask app) {
+    this.app = app;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java
new file mode 100644
index 0000000..d52e981
--- /dev/null
+++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java
@@ -0,0 +1,268 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.s4.core.Stream;
+import org.apache.samoa.core.Globals;
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.ComponentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Option;
+import com.github.javacliparser.ClassOption;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/*
+ * S4 App that runs samoa Tasks
+ *
+ * */
+
+/**
+ * The Class DoTaskApp.
+ */
+final public class S4DoTask extends App {
+
+  private final Logger logger = LoggerFactory.getLogger(S4DoTask.class);
+  Task task;
+
+  @Inject
+  @Named("evalTask")
+  public String evalTask;
+
+  public S4DoTask() {
+    super();
+  }
+
+  /** The engine. */
+  protected ComponentFactory componentFactory;
+
+  /**
+   * Gets the factory.
+   * 
+   * @return the factory
+   */
+  public ComponentFactory getFactory() {
+    return componentFactory;
+  }
+
+  /**
+   * Sets the factory.
+   * 
+   * @param factory
+   *          the new factory
+   */
+  public void setFactory(ComponentFactory factory) {
+    this.componentFactory = factory;
+  }
+
+  /*
+   * Build the application
+   * 
+   * @see org.apache.s4.core.App#onInit()
+   */
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#onInit()
+   */
+  @Override
+  protected void onInit() {
+    logger.info("DoTaskApp onInit");
+    // ConsoleReporters prints S4 metrics
+    // MetricsRegistry mr = new MetricsRegistry();
+    //
+    // CsvReporter.enable(new File(System.getProperty("user.home")
+    // + "/monitor/"), 10, TimeUnit.SECONDS);
+    // ConsoleReporter.enable(10, TimeUnit.SECONDS);
+    try {
+      System.err.println();
+      System.err.println(Globals.getWorkbenchInfoString());
+      System.err.println();
+
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    }
+    S4ComponentFactory factory = new S4ComponentFactory();
+    factory.setApp(this);
+
+    // logger.debug("LC {}", lc);
+
+    // task = TaskProvider.getTask(evalTask);
+
+    // EXAMPLE OPTIONS
+    // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K
+    // 5 -N 0.0)
+    // String[] args = new String[] {evalTask,"-l", "Clustream","-g",
+    // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents",
+    // "-K", "5", "-N", "0.0)"};
+    // String[] args = new String[] { evalTask, "-l", "clustream.Clustream",
+    // "-g", "clustream.Clustream", "-i", "100000", "-s",
+    // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" };
+    logger.debug("PARAMETERS {}", evalTask);
+    // params = params.replace(":", " ");
+    List<String> parameters = new ArrayList<String>();
+    // parameters.add(evalTask);
+    try {
+      parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, 
"UTF-8").split(" ")));
+    } catch (UnsupportedEncodingException ex) {
+      ex.printStackTrace();
+    }
+    String[] args = parameters.toArray(new String[0]);
+    Option[] extraOptions = new Option[] {};
+    // build a single string by concatenating cli options
+    StringBuilder cliString = new StringBuilder();
+    for (int i = 0; i < args.length; i++) {
+      cliString.append(" ").append(args[i]);
+    }
+
+    // parse options
+    try {
+      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, extraOptions);
+      task.setFactory(factory);
+      task.init();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#onStart()
+   */
+  @Override
+  protected void onStart() {
+    logger.info("Starting DoTaskApp... App Partition [{}]", 
this.getPartitionId());
+    // <<<<<<< HEAD Task doesn't have start in latest storm-impl
+    // TODO change the way the app starts
+    // if (this.getPartitionId() == 0)
+    S4Topology s4topology = (S4Topology) getTask().getTopology();
+    S4EntranceProcessingItem epi = (S4EntranceProcessingItem) 
s4topology.getEntranceProcessingItem();
+    while (epi.injectNextEvent())
+      // inject events from the EntrancePI
+      ;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#onClose()
+   */
+  @Override
+  protected void onClose() {
+    System.out.println("Closing DoTaskApp...");
+
+  }
+
+  /**
+   * Gets the task.
+   * 
+   * @return the task
+   */
+  public Task getTask() {
+    return task;
+  }
+
+  // These methods are protected in App and can not be accessed from outside.
+  // They are
+  // called from parallel classifiers and evaluations. Is there a better way
+  // to do that?
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#createPE(java.lang.Class)
+   */
+  @Override
+  public <T extends ProcessingElement> T createPE(Class<T> type) {
+    return super.createPE(type);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#createStream(java.lang.String,
+   * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[])
+   */
+  @Override
+  public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> 
finder,
+      ProcessingElement... processingElements) {
+    return super.createStream(name, finder, processingElements);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.App#createStream(java.lang.String,
+   * org.apache.s4.core.ProcessingElement[])
+   */
+  @Override
+  public <T extends Event> Stream<T> createStream(String name, 
ProcessingElement... processingElements) {
+    return super.createStream(name, processingElements);
+  }
+
+  // @com.beust.jcommander.Parameters(separators = "=")
+  // class Parameters {
+  //
+  // @Parameter(names={"-lc","-local"}, description="Local clustering method")
+  // private String localClustering;
+  //
+  // @Parameter(names={"-gc","-global"},
+  // description="Global clustering method")
+  // private String globalClustering;
+  //
+  // }
+  //
+  // class ParametersConverter {// implements IStringConverter<String[]> {
+  //
+  //
+  // public String[] convertToArgs(String value) {
+  //
+  // String[] params = value.split(",");
+  // String[] args = new String[params.length*2];
+  // for(int i=0; i<params.length ; i++) {
+  // args[i] = params[i].split("=")[0];
+  // args[i+1] = params[i].split("=")[1];
+  // i++;
+  // }
+  // return args;
+  // }
+  //
+  // }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java
 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java
new file mode 100644
index 0000000..771cbc8
--- /dev/null
+++ 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java
@@ -0,0 +1,119 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.Stream;
+
+// TODO adapt this entrance processing item to connect to external streams so 
the application doesnt need to use an AdapterApp
+
+public class S4EntranceProcessingItem extends ProcessingElement implements 
EntranceProcessingItem {
+
+  private EntranceProcessor entranceProcessor;
+  // private S4DoTask app;
+  private int parallelism;
+  protected Stream outputStream;
+
+  /**
+   * Constructor of an S4 entrance processing item.
+   * 
+   * @param app
+   *          : S4 application
+   */
+  public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App 
app) {
+    super(app);
+    this.entranceProcessor = entranceProcessor;
+    // this.app = (S4DoTask) app;
+    // this.setSingleton(true);
+  }
+
+  public void setParallelism(int parallelism) {
+    this.parallelism = parallelism;
+  }
+
+  public int getParallelism() {
+    return this.parallelism;
+  }
+
+  @Override
+  public EntranceProcessor getProcessor() {
+    return this.entranceProcessor;
+  }
+
+  //
+  // @Override
+  // public void put(Instance inst) {
+  // // do nothing
+  // // may not needed
+  // }
+
+  @Override
+  protected void onCreate() {
+    // was commented
+    if (this.entranceProcessor != null) {
+      // TODO revisit if we need to change it to a clone() call
+      this.entranceProcessor = (EntranceProcessor) 
this.entranceProcessor.newProcessor(this.entranceProcessor);
+      this.entranceProcessor.onCreate(Integer.parseInt(getId()));
+    }
+  }
+
+  @Override
+  protected void onRemove() {
+    // do nothing
+  }
+
+  //
+  // /**
+  // * Sets the entrance processing item processor.
+  // *
+  // * @param processor
+  // */
+  // public void setProcessor(Processor processor) {
+  // this.entranceProcessor = processor;
+  // }
+
+  @Override
+  public void setName(String name) {
+    super.setName(name);
+  }
+
+  @Override
+  public EntranceProcessingItem setOutputStream(Stream stream) {
+    if (this.outputStream != null)
+      throw new IllegalStateException("Output stream for an EntrancePI sohuld 
be initialized only once");
+    this.outputStream = stream;
+    return this;
+  }
+
+  public boolean injectNextEvent() {
+    if (entranceProcessor.hasNext()) {
+      ContentEvent nextEvent = this.entranceProcessor.nextEvent();
+      outputStream.put(nextEvent);
+      return entranceProcessor.hasNext();
+    } else
+      return false;
+    // return !nextEvent.isLastEvent();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java
new file mode 100644
index 0000000..154715b
--- /dev/null
+++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java
@@ -0,0 +1,91 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import net.jcip.annotations.Immutable;
+
+import org.apache.s4.base.Event;
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ * The Class InstanceEvent.
+ */
+@Immutable
+final public class S4Event extends Event {
+
+  private String key;
+
+  public String getKey() {
+    return key;
+  }
+
+  public void setKey(String key) {
+    this.key = key;
+  }
+
+  /** The content event. */
+  private ContentEvent contentEvent;
+
+  /**
+   * Instantiates a new instance event.
+   */
+  public S4Event() {
+    // Needed for serialization of kryo
+  }
+
+  /**
+   * Instantiates a new instance event.
+   * 
+   * @param contentEvent
+   *          the content event
+   */
+  public S4Event(ContentEvent contentEvent) {
+    if (contentEvent != null) {
+      this.contentEvent = contentEvent;
+      this.key = contentEvent.getKey();
+
+    }
+  }
+
+  /**
+   * Gets the content event.
+   * 
+   * @return the content event
+   */
+  public ContentEvent getContentEvent() {
+    return contentEvent;
+  }
+
+  /**
+   * Sets the content event.
+   * 
+   * @param contentEvent
+   *          the new content event
+   */
+  public void setContentEvent(ContentEvent contentEvent) {
+    this.contentEvent = contentEvent;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java
new file mode 100644
index 0000000..b9c7467
--- /dev/null
+++ 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java
@@ -0,0 +1,186 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.s4.base.KeyFinder;
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * S4 Platform platform specific processing item, inherits from S4 
ProcessinElemnt.
+ * 
+ * @author severien
+ * 
+ */
+public class S4ProcessingItem extends ProcessingElement implements
+    ProcessingItem {
+
+  public static final Logger logger = LoggerFactory
+      .getLogger(S4ProcessingItem.class);
+
+  private Processor processor;
+  private int paralellismLevel;
+  private S4DoTask app;
+
+  private static final String NAME = "PROCESSING-ITEM-";
+  private static int OBJ_COUNTER = 0;
+
+  /**
+   * Constructor of S4 ProcessingItem.
+   * 
+   * @param app
+   *          : S4 application
+   */
+  public S4ProcessingItem(App app) {
+    super(app);
+    super.setName(NAME + OBJ_COUNTER);
+    OBJ_COUNTER++;
+    this.app = (S4DoTask) app;
+    this.paralellismLevel = 1;
+  }
+
+  @Override
+  public String getName() {
+    return super.getName();
+  }
+
+  /**
+   * Gets processing item paralellism level.
+   * 
+   * @return int
+   */
+  public int getParalellismLevel() {
+    return paralellismLevel;
+  }
+
+  /**
+   * Sets processing item paralellism level.
+   * 
+   * @param paralellismLevel
+   */
+  public void setParalellismLevel(int paralellismLevel) {
+    this.paralellismLevel = paralellismLevel;
+  }
+
+  /**
+   * onEvent method.
+   * 
+   * @param event
+   */
+  public void onEvent(S4Event event) {
+    if (processor.process(event.getContentEvent()) == true) {
+      close();
+    }
+  }
+
+  /**
+   * Sets S4 processing item processor.
+   * 
+   * @param processor
+   */
+  public void setProcessor(Processor processor) {
+    this.processor = processor;
+  }
+
+  // Methods from ProcessingItem
+  @Override
+  public Processor getProcessor() {
+    return processor;
+  }
+
+  /**
+   * KeyFinder sets the keys for a specific event.
+   * 
+   * @return KeyFinder
+   */
+  private KeyFinder<S4Event> getKeyFinder() {
+    KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() {
+      @Override
+      public List<String> get(S4Event s4event) {
+        List<String> results = new ArrayList<String>();
+        results.add(s4event.getKey());
+        return results;
+      }
+    };
+
+    return keyFinder;
+  }
+
+  @Override
+  public ProcessingItem connectInputAllStream(Stream inputStream) {
+
+    S4Stream stream = (S4Stream) inputStream;
+    stream.setParallelism(this.paralellismLevel);
+    stream.addStream(inputStream.getStreamId(),
+        getKeyFinder(), this, S4Stream.BROADCAST);
+    return this;
+  }
+
+  @Override
+  public ProcessingItem connectInputKeyStream(Stream inputStream) {
+
+    S4Stream stream = (S4Stream) inputStream;
+    stream.setParallelism(this.paralellismLevel);
+    stream.addStream(inputStream.getStreamId(),
+        getKeyFinder(), this, S4Stream.GROUP_BY_KEY);
+
+    return this;
+  }
+
+  @Override
+  public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+    S4Stream stream = (S4Stream) inputStream;
+    stream.setParallelism(this.paralellismLevel);
+    stream.addStream(inputStream.getStreamId(),
+        getKeyFinder(), this, S4Stream.SHUFFLE);
+
+    return this;
+  }
+
+  // Methods from ProcessingElement
+  @Override
+  protected void onCreate() {
+    logger.debug("PE ID {}", getId());
+    if (this.processor != null) {
+      this.processor = this.processor.newProcessor(this.processor);
+      this.processor.onCreate(Integer.parseInt(getId()));
+    }
+  }
+
+  @Override
+  protected void onRemove() {
+    // do nothing
+  }
+
+  @Override
+  public int getParallelism() {
+    return this.paralellismLevel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java
new file mode 100644
index 0000000..734462e
--- /dev/null
+++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java
@@ -0,0 +1,184 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.s4.base.KeyFinder;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * S4 Platform specific stream.
+ * 
+ * @author severien
+ * 
+ */
+public class S4Stream extends AbstractStream {
+
+  public static final int SHUFFLE = 0;
+  public static final int GROUP_BY_KEY = 1;
+  public static final int BROADCAST = 2;
+
+  private static final Logger logger = LoggerFactory.getLogger(S4Stream.class);
+
+  private S4DoTask app;
+  private int processingItemParalellism;
+  private int shuffleCounter;
+
+  private static final String NAME = "STREAM-";
+  private static int OBJ_COUNTER = 0;
+
+  /* The stream list */
+  public List<StreamType> streams;
+
+  public S4Stream(S4DoTask app) {
+    super();
+    this.app = app;
+    this.processingItemParalellism = 1;
+    this.shuffleCounter = 0;
+    this.streams = new ArrayList<StreamType>();
+    this.setStreamId(NAME + OBJ_COUNTER);
+    OBJ_COUNTER++;
+  }
+
+  public S4Stream(S4DoTask app, S4ProcessingItem pi) {
+    super();
+    this.app = app;
+    this.processingItemParalellism = 1;
+    this.shuffleCounter = 0;
+    this.streams = new ArrayList<StreamType>();
+    this.setStreamId(NAME + OBJ_COUNTER);
+    OBJ_COUNTER++;
+
+  }
+
+  /**
+   * 
+   * @return
+   */
+  public int getParallelism() {
+    return processingItemParalellism;
+  }
+
+  public void setParallelism(int parallelism) {
+    this.processingItemParalellism = parallelism;
+  }
+
+  public void addStream(String streamID, KeyFinder<S4Event> finder,
+      S4ProcessingItem pi, int type) {
+    String streamName = streamID + "_" + pi.getName();
+    org.apache.s4.core.Stream<S4Event> stream = this.app.createStream(
+        streamName, pi);
+    stream.setName(streamName);
+    logger.debug("Stream name S4Stream {}", streamName);
+    if (finder != null)
+      stream.setKey(finder);
+    this.streams.add(new StreamType(stream, type));
+
+  }
+
+  @Override
+  public void put(ContentEvent event) {
+
+    for (int i = 0; i < streams.size(); i++) {
+
+      switch (streams.get(i).getType()) {
+      case SHUFFLE:
+        S4Event s4event = new S4Event(event);
+        s4event.setStreamId(streams.get(i).getStream().getName());
+        if (getParallelism() == 1) {
+          s4event.setKey("0");
+        } else {
+          s4event.setKey(Integer.toString(shuffleCounter));
+        }
+        streams.get(i).getStream().put(s4event);
+        shuffleCounter++;
+        if (shuffleCounter >= (getParallelism())) {
+          shuffleCounter = 0;
+        }
+
+        break;
+
+      case GROUP_BY_KEY:
+        S4Event s4event1 = new S4Event(event);
+        s4event1.setStreamId(streams.get(i).getStream().getName());
+        HashCodeBuilder hb = new HashCodeBuilder();
+        hb.append(event.getKey());
+        String key = Integer.toString(hb.build() % getParallelism());
+        s4event1.setKey(key);
+        streams.get(i).getStream().put(s4event1);
+        break;
+
+      case BROADCAST:
+        for (int p = 0; p < this.getParallelism(); p++) {
+          S4Event s4event2 = new S4Event(event);
+          s4event2.setStreamId(streams.get(i).getStream().getName());
+          s4event2.setKey(Integer.toString(p));
+          streams.get(i).getStream().put(s4event2);
+        }
+        break;
+
+      default:
+        break;
+      }
+
+    }
+
+  }
+
+  /**
+   * Subclass for definig stream connection type
+   * 
+   * @author severien
+   * 
+   */
+  class StreamType {
+    org.apache.s4.core.Stream<S4Event> stream;
+    int type;
+
+    public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) {
+      this.stream = s;
+      this.type = t;
+    }
+
+    public org.apache.s4.core.Stream<S4Event> getStream() {
+      return stream;
+    }
+
+    public void setStream(org.apache.s4.core.Stream<S4Event> stream) {
+      this.stream = stream;
+    }
+
+    public int getType() {
+      return type;
+    }
+
+    public void setType(int type) {
+      this.type = type;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java
new file mode 100644
index 0000000..22807a6
--- /dev/null
+++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java
@@ -0,0 +1,144 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.s4.core.util.AppConfig;
+import org.apache.s4.core.util.ParsingUtils;
+import org.apache.s4.deploy.DeploymentUtils;
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.ISubmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class S4Submitter implements ISubmitter {
+
+  private static Logger logger = LoggerFactory.getLogger(S4Submitter.class);
+
+  @Override
+  public void deployTask(Task task) {
+    // TODO: Get application FROM HTTP server
+    // TODO: Initializa a http server to serve the app package
+
+    String appURIString = null;
+    // File app = new File(System.getProperty("user.dir")
+    // + "/src/site/dist/SAMOA-S4-0.1-dist.jar");
+
+    // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar
+    try {
+      URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar";);
+      appURIString = appURL.toString();
+    } catch (MalformedURLException e1) {
+      e1.printStackTrace();
+    }
+
+    // try {
+    // appURIString = app.toURI().toURL().toString();
+    // } catch (MalformedURLException e) {
+    // e.printStackTrace();
+    // }
+    if (task == null) {
+      logger.error("Can't execute since evaluation task is not set!");
+      return;
+    } else {
+      logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ",
+          task.getClass().getSimpleName(), appURIString);
+    }
+
+    String[] args = { "-c=testCluster2",
+        "-appClass=" + S4DoTask.class.getName(),
+        "-appName=" + "samoaApp",
+        "-p=evalTask=" + task.getClass().getSimpleName(),
+        "-zk=localhost:2181", "-s4r=" + appURIString, "-emc=" + 
SamoaSerializerModule.class.getName() };
+    // "-emc=" + S4MOAModule.class.getName(),
+    // "@" +
+    // Resources.getResource("s4moa.properties").getFile(),
+
+    S4Config s4config = new S4Config();
+    JCommander jc = new JCommander(s4config);
+    jc.parse(args);
+
+    Map<String, String> namedParameters = new HashMap<String, String>();
+    for (String parameter : s4config.namedParameters) {
+      String[] param = parameter.split("=");
+      namedParameters.put(param[0], param[1]);
+    }
+
+    AppConfig config = new AppConfig.Builder()
+        .appClassName(s4config.appClass).appName(s4config.appName)
+        .appURI(s4config.appURI).namedParameters(namedParameters)
+        .build();
+
+    DeploymentUtils.initAppConfig(config, s4config.clusterName, true,
+        s4config.zkString);
+
+    System.out.println("Suposedly deployed on S4");
+  }
+
+  public void initHTTPServer() {
+
+  }
+
+  @Parameters(separators = "=")
+  public static class S4Config {
+
+    @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", 
required = true)
+    String clusterName = null;
+
+    @Parameter(names = "-appClass", description = "Main App class", required = 
false)
+    String appClass = null;
+
+    @Parameter(names = "-appName", description = "Application name", required 
= false)
+    String appName = null;
+
+    @Parameter(names = "-s4r", description = "Application URI", required = 
false)
+    String appURI = null;
+
+    @Parameter(names = "-zk", description = "ZooKeeper connection string", 
required = false)
+    String zkString = null;
+
+    @Parameter(names = { "-extraModulesClasses", "-emc" }, description = 
"Comma-separated list of additional configuration modules (they will be 
instantiated through their constructor without arguments).", required = false)
+    List<String> extraModules = new ArrayList<String>();
+
+    @Parameter(names = { "-p", "-namedStringParameters" }, description = 
"Comma-separated list of inline configuration "
+        + "parameters, taking precedence over homonymous configuration 
parameters from configuration files. "
+        + "Syntax: '-p=name1=value1,name2=value2 '", required = false, 
converter = ParsingUtils.InlineConfigParameterConverter.class)
+    List<String> namedParameters = new ArrayList<String>();
+
+  }
+
+  @Override
+  public void setLocal(boolean bool) {
+    // TODO S4 works the same for local and distributed environments
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java
new file mode 100644
index 0000000..413cfda
--- /dev/null
+++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java
@@ -0,0 +1,63 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.AbstractTopology;
+import org.apache.samoa.topology.EntranceProcessingItem;
+
+public class S4Topology extends AbstractTopology {
+
+  // CASEY: it seems evaluationTask is not used.
+  // Remove it for now
+
+  // private String _evaluationTask;
+
+  // S4Topology(String topoName, String evalTask) {
+  // super(topoName);
+  // }
+  //
+  // S4Topology(String topoName) {
+  // this(topoName, null);
+  // }
+
+  // @Override
+  // public void setEvaluationTask(String evalTask) {
+  // _evaluationTask = evalTask;
+  // }
+  //
+  // @Override
+  // public String getEvaluationTask() {
+  // return _evaluationTask;
+  // }
+
+  S4Topology(String topoName) {
+    super(topoName);
+  }
+
+  protected EntranceProcessingItem getEntranceProcessingItem() {
+    if (this.getEntranceProcessingItems() == null)
+      return null;
+    if (this.getEntranceProcessingItems().size() < 1)
+      return null;
+    // TODO: support multiple entrance PIs
+    return (EntranceProcessingItem) 
this.getEntranceProcessingItems().toArray()[0];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java
new file mode 100644
index 0000000..9f9f144
--- /dev/null
+++ b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java
@@ -0,0 +1,99 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.nio.ByteBuffer;
+
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.samoa.learners.classifiers.trees.AttributeContentEvent;
+import org.apache.samoa.learners.classifiers.trees.ComputeContentEvent;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+public class SamoaSerializer implements SerializerDeserializer {
+
+  private ThreadLocal<Kryo> kryoThreadLocal;
+  private ThreadLocal<Output> outputThreadLocal;
+
+  private int initialBufferSize = 2048;
+  private int maxBufferSize = 256 * 1024;
+
+  public void setMaxBufferSize(int maxBufferSize) {
+    this.maxBufferSize = maxBufferSize;
+  }
+
+  /**
+   * 
+   * @param classLoader
+   *          classloader able to handle classes to serialize/deserialize. For 
instance, application-level events can
+   *          only be handled by the application classloader.
+   */
+  @Inject
+  public SamoaSerializer(@Assisted final ClassLoader classLoader) {
+    kryoThreadLocal = new ThreadLocal<Kryo>() {
+
+      @Override
+      protected Kryo initialValue() {
+        Kryo kryo = new Kryo();
+        kryo.setClassLoader(classLoader);
+        kryo.register(AttributeContentEvent.class, new 
AttributeContentEvent.AttributeCEFullPrecSerializer());
+        kryo.register(ComputeContentEvent.class, new 
ComputeContentEvent.ComputeCEFullPrecSerializer());
+        kryo.setRegistrationRequired(false);
+        return kryo;
+      }
+    };
+
+    outputThreadLocal = new ThreadLocal<Output>() {
+      @Override
+      protected Output initialValue() {
+        Output output = new Output(initialBufferSize, maxBufferSize);
+        return output;
+      }
+    };
+
+  }
+
+  @Override
+  public Object deserialize(ByteBuffer rawMessage) {
+    Input input = new Input(rawMessage.array());
+    try {
+      return kryoThreadLocal.get().readClassAndObject(input);
+    } finally {
+      input.close();
+    }
+  }
+
+  @SuppressWarnings("resource")
+  @Override
+  public ByteBuffer serialize(Object message) {
+    Output output = outputThreadLocal.get();
+    try {
+      kryoThreadLocal.get().writeClassAndObject(output, message);
+      return ByteBuffer.wrap(output.toBytes());
+    } finally {
+      output.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java
 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java
new file mode 100644
index 0000000..e530a09
--- /dev/null
+++ 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java
@@ -0,0 +1,35 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.s4.base.SerializerDeserializer;
+
+import com.google.inject.AbstractModule;
+
+public class SamoaSerializerModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(SerializerDeserializer.class).to(SamoaSerializer.class);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-samza/pom.xml b/samoa-samza/pom.xml
index 0a04c65..1344031 100644
--- a/samoa-samza/pom.xml
+++ b/samoa-samza/pom.xml
@@ -31,14 +31,14 @@
 
   <artifactId>samoa-samza</artifactId>
   <parent>
-    <groupId>com.yahoo.labs.samoa</groupId>
+    <groupId>org.apache.samoa</groupId>
     <artifactId>samoa</artifactId>
     <version>0.3.0-SNAPSHOT</version>
   </parent>
 
   <dependencies>
     <dependency>
-      <groupId>com.yahoo.labs.samoa</groupId>
+      <groupId>org.apache.samoa</groupId>
       <artifactId>samoa-api</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
deleted file mode 100644
index 8f90478..0000000
--- a/samoa-samza/src/main/java/com/yahoo/labs/samoa/SamzaDoTask.java
+++ /dev/null
@@ -1,226 +0,0 @@
-package com.yahoo.labs.samoa;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.File;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.ClassOption;
-import com.yahoo.labs.samoa.tasks.Task;
-import com.yahoo.labs.samoa.topology.impl.SamzaComponentFactory;
-import com.yahoo.labs.samoa.topology.impl.SamzaEngine;
-import com.yahoo.labs.samoa.topology.impl.SamzaTopology;
-import com.yahoo.labs.samoa.utils.SystemsUtils;
-
-/**
- * Main class to run the task on Samza
- * 
- * @author Anh Thu Vu
- */
-public class SamzaDoTask {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(SamzaDoTask.class);
-
-  private static final String LOCAL_MODE = "local";
-  private static final String YARN_MODE = "yarn";
-
-  // FLAGS
-  private static final String YARN_CONF_FLAG = "--yarn_home";
-  private static final String MODE_FLAG = "--mode";
-  private static final String ZK_FLAG = "--zookeeper";
-  private static final String KAFKA_FLAG = "--kafka";
-  private static final String KAFKA_REPLICATION_FLAG = 
"--kafka_replication_factor";
-  private static final String CHECKPOINT_FREQ_FLAG = "--checkpoint_frequency";
-  private static final String JAR_PACKAGE_FLAG = "--jar_package";
-  private static final String SAMOA_HDFS_DIR_FLAG = "--samoa_hdfs_dir";
-  private static final String AM_MEMORY_FLAG = "--yarn_am_mem";
-  private static final String CONTAINER_MEMORY_FLAG = "--yarn_container_mem";
-  private static final String PI_PER_CONTAINER_FLAG = "--pi_per_container";
-
-  private static final String KRYO_REGISTER_FLAG = "--kryo_register";
-
-  // config values
-  private static int kafkaReplicationFactor = 1;
-  private static int checkpointFrequency = 60000;
-  private static String kafka = "localhost:9092";
-  private static String zookeeper = "localhost:2181";
-  private static boolean isLocal = true;
-  private static String yarnConfHome = null;
-  private static String samoaHDFSDir = null;
-  private static String jarPackagePath = null;
-  private static int amMem = 1024;
-  private static int containerMem = 1024;
-  private static int piPerContainer = 2;
-  private static String kryoRegisterFile = null;
-
-  /*
-   * 1. Read arguments 2. Construct topology/task 3. Upload the JAR to HDFS if
-   * we are running on YARN 4. Submit topology to SamzaEngine
-   */
-  public static void main(String[] args) {
-    // Read arguments
-    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
-    parseArguments(tmpArgs);
-
-    args = tmpArgs.toArray(new String[0]);
-
-    // Init Task
-    StringBuilder cliString = new StringBuilder();
-    for (int i = 0; i < args.length; i++) {
-      cliString.append(" ").append(args[i]);
-    }
-    logger.debug("Command line string = {}", cliString.toString());
-    System.out.println("Command line string = " + cliString.toString());
-
-    Task task = null;
-    try {
-      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
-      logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
-    } catch (Exception e) {
-      logger.error("Fail to initialize the task", e);
-      System.out.println("Fail to initialize the task" + e);
-      return;
-    }
-    task.setFactory(new SamzaComponentFactory());
-    task.init();
-
-    // Upload JAR file to HDFS
-    String hdfsPath = null;
-    if (!isLocal) {
-      Path path = FileSystems.getDefault().getPath(jarPackagePath);
-      hdfsPath = uploadJarToHDFS(path.toFile());
-      if (hdfsPath == null) {
-        System.out.println("Fail uploading JAR file \"" + 
path.toAbsolutePath().toString() + "\" to HDFS.");
-        return;
-      }
-    }
-
-    // Set parameters
-    SamzaEngine.getEngine()
-        .setLocalMode(isLocal)
-        .setZooKeeper(zookeeper)
-        .setKafka(kafka)
-        .setYarnPackage(hdfsPath)
-        .setKafkaReplicationFactor(kafkaReplicationFactor)
-        .setConfigHome(yarnConfHome)
-        .setAMMemory(amMem)
-        .setContainerMemory(containerMem)
-        .setPiPerContainerRatio(piPerContainer)
-        .setKryoRegisterFile(kryoRegisterFile)
-        .setCheckpointFrequency(checkpointFrequency);
-
-    // Submit topology
-    SamzaEngine.submitTopology((SamzaTopology) task.getTopology());
-
-  }
-
-  private static boolean isLocalMode(String mode) {
-    return mode.equals(LOCAL_MODE);
-  }
-
-  private static void parseArguments(List<String> args) {
-    for (int i = args.size() - 1; i >= 0; i--) {
-      String arg = args.get(i).trim();
-      String[] splitted = arg.split("=", 2);
-
-      if (splitted.length >= 2) {
-        // YARN config folder which contains conf/core-site.xml,
-        // conf/hdfs-site.xml, conf/yarn-site.xml
-        if (splitted[0].equals(YARN_CONF_FLAG)) {
-          yarnConfHome = splitted[1];
-          args.remove(i);
-        }
-        // host:port for zookeeper cluster
-        else if (splitted[0].equals(ZK_FLAG)) {
-          zookeeper = splitted[1];
-          args.remove(i);
-        }
-        // host:port,... for kafka broker(s)
-        else if (splitted[0].equals(KAFKA_FLAG)) {
-          kafka = splitted[1];
-          args.remove(i);
-        }
-        // whether we are running Samza in Local mode or YARN mode
-        else if (splitted[0].equals(MODE_FLAG)) {
-          isLocal = isLocalMode(splitted[1]);
-          args.remove(i);
-        }
-        // memory requirement for YARN application master
-        else if (splitted[0].equals(AM_MEMORY_FLAG)) {
-          amMem = Integer.parseInt(splitted[1]);
-          args.remove(i);
-        }
-        // memory requirement for YARN worker container
-        else if (splitted[0].equals(CONTAINER_MEMORY_FLAG)) {
-          containerMem = Integer.parseInt(splitted[1]);
-          args.remove(i);
-        }
-        // the path to JAR archive that we need to upload to HDFS
-        else if (splitted[0].equals(JAR_PACKAGE_FLAG)) {
-          jarPackagePath = splitted[1];
-          args.remove(i);
-        }
-        // the HDFS dir for SAMOA files
-        else if (splitted[0].equals(SAMOA_HDFS_DIR_FLAG)) {
-          samoaHDFSDir = splitted[1];
-          if (samoaHDFSDir.length() < 1)
-            samoaHDFSDir = null;
-          args.remove(i);
-        }
-        // number of max PI instances per container
-        // this will be used to compute the number of containers
-        // AM will request for the job
-        else if (splitted[0].equals(PI_PER_CONTAINER_FLAG)) {
-          piPerContainer = Integer.parseInt(splitted[1]);
-          args.remove(i);
-        }
-        // kafka streams replication factor
-        else if (splitted[0].equals(KAFKA_REPLICATION_FLAG)) {
-          kafkaReplicationFactor = Integer.parseInt(splitted[1]);
-          args.remove(i);
-        }
-        // checkpoint frequency in ms
-        else if (splitted[0].equals(CHECKPOINT_FREQ_FLAG)) {
-          checkpointFrequency = Integer.parseInt(splitted[1]);
-          args.remove(i);
-        }
-        // the file contains registration information for Kryo serializer
-        else if (splitted[0].equals(KRYO_REGISTER_FLAG)) {
-          kryoRegisterFile = splitted[1];
-          args.remove(i);
-        }
-      }
-    }
-  }
-
-  private static String uploadJarToHDFS(File file) {
-    SystemsUtils.setHadoopConfigHome(yarnConfHome);
-    SystemsUtils.setSAMOADir(samoaHDFSDir);
-    return SystemsUtils.copyToHDFS(file, file.getName());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
deleted file mode 100644
index 486472c..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamoaSystemFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemConsumer;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.system.SystemProducer;
-import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
-
-import 
com.yahoo.labs.samoa.topology.impl.SamzaEntranceProcessingItem.SamoaSystemConsumer;
-
-/**
- * Implementation of Samza's SystemFactory Samza will use this factory to get 
our custom consumer which gets the events
- * from SAMOA EntranceProcessor and feed them to EntranceProcessingItem task
- * 
- * @author Anh Thu Vu
- */
-public class SamoaSystemFactory implements SystemFactory {
-  @Override
-  public SystemAdmin getAdmin(String systemName, Config config) {
-    return new SinglePartitionWithoutOffsetsSystemAdmin();
-  }
-
-  @Override
-  public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
-    return new SamoaSystemConsumer(systemName, config);
-  }
-
-  @Override
-  public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
-    throw new SamzaException("This implementation is not supposed to produce 
anything.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
 
b/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
deleted file mode 100644
index 813c3b3..0000000
--- 
a/samoa-samza/src/main/java/com/yahoo/labs/samoa/topology/impl/SamzaComponentFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.ComponentFactory;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-
-/**
- * Implementation of SAMOA ComponentFactory for Samza
- * 
- * @author Anh Thu Vu
- */
-public class SamzaComponentFactory implements ComponentFactory {
-  @Override
-  public ProcessingItem createPi(Processor processor) {
-    return this.createPi(processor, 1);
-  }
-
-  @Override
-  public ProcessingItem createPi(Processor processor, int parallelism) {
-    return new SamzaProcessingItem(processor, parallelism);
-  }
-
-  @Override
-  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
-    return new SamzaEntranceProcessingItem(entranceProcessor);
-  }
-
-  @Override
-  public Stream createStream(IProcessingItem sourcePi) {
-    return new SamzaStream(sourcePi);
-  }
-
-  @Override
-  public Topology createTopology(String topoName) {
-    return new SamzaTopology(topoName);
-  }
-}

Reply via email to