SAMOA-57: Remove support for S4

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/de050f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/de050f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/de050f99

Branch: refs/heads/master
Commit: de050f994d895dfb599d209ba267fcf5de132f2e
Parents: b86ab83
Author: Gianmarco De Francisci Morales <[email protected]>
Authored: Sun Mar 6 13:49:06 2016 +0300
Committer: Gianmarco De Francisci Morales <[email protected]>
Committed: Sun Mar 6 13:49:06 2016 +0300

----------------------------------------------------------------------
 .travis.yml                                     |  10 -
 pom.xml                                         |  12 -
 samoa-s4/pom.xml                                | 136 ----------
 samoa-s4/samoa-s4-adapter/pom.xml               |  54 ----
 .../samoa/topology/adapter/S4AdapterApp.java    |  45 ----
 .../adapter/S4EntranceProcessingItem.java       |  74 -----
 .../samoa/topology/adapter/package-info.java    |  28 --
 samoa-s4/src/main/assembly/samoa-s4.xml         |  78 ------
 .../samoa/topology/impl/S4ComponentFactory.java |  97 -------
 .../apache/samoa/topology/impl/S4DoTask.java    | 268 -------------------
 .../topology/impl/S4EntranceProcessingItem.java | 119 --------
 .../org/apache/samoa/topology/impl/S4Event.java |  91 -------
 .../samoa/topology/impl/S4ProcessingItem.java   | 186 -------------
 .../apache/samoa/topology/impl/S4Stream.java    | 184 -------------
 .../apache/samoa/topology/impl/S4Submitter.java | 144 ----------
 .../apache/samoa/topology/impl/S4Topology.java  |  63 -----
 .../samoa/topology/impl/SamoaSerializer.java    |  99 -------
 .../topology/impl/SamoaSerializerModule.java    |  35 ---
 18 files changed, 1723 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a122ebf..6cf1ad3 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,16 +8,6 @@ jdk:
 - oraclejdk7
 
 install:
-- git clone https://github.com/apache/incubator-s4.git
-- cd incubator-s4
-- git checkout tags/0.6.0-Final
-- mv ../bin/s4-build/gradlew .
-- wget http://people.apache.org/~gdfm/gradle-wrapper-1.4.jar
-- mv gradle-wrapper-1.4.jar ./lib/
-- mv ../bin/s4-build/gradle-wrapper-1.4.properties ./lib/
-- ./gradlew install
-- ./gradlew s4-tools::installApp
-- cd ..
 - echo 
"<settings><servers><server><id>apache.snapshots.https</id><username>${SOSS_USERNAME}</username><password>${SOSS_PASSWORD}</password></server></servers></settings>"
 > ${HOME}/.m2/settings.xml
 #- cat ${HOME}/.m2/settings.xml
 

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af8fe98..3304937 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,15 +76,6 @@
             </modules>
         </profile>
         <profile>
-            <id>s4</id>
-            <modules>
-                <module>samoa-instances</module>
-                <module>samoa-api</module>
-                <module>samoa-s4</module>
-                <module>samoa-test</module>
-            </modules>
-        </profile>
-        <profile>
             <id>flink</id>
             <modules>
                 <module>samoa-instances</module>
@@ -111,7 +102,6 @@
                 <module>samoa-threads</module>
                 <module>samoa-storm</module>
                 <module>samoa-flink</module>
-                <module>samoa-s4</module>
                 <module>samoa-samza</module>
                 <module>samoa-test</module>
             </modules>
@@ -135,7 +125,6 @@
         <kryo.version>2.21</kryo.version>
         <metrics-core.version>2.2.0</metrics-core.version>
         <miniball.version>1.0.3</miniball.version>
-        <s4.version>0.6.0-incubating</s4.version>
         <samza.version>0.7.0</samza.version>
         <flink.version>0.10.1</flink.version>
         <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
@@ -219,7 +208,6 @@
                         <root>samoa-instances</root>
                         <root>samoa-local</root>
                         <root>samoa-storm</root>
-                        <root>samoa-s4</root>
                         <root>samoa-flink</root>
                         <root>samoa-samza</root>
                         <root>samoa-test</root>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-s4/pom.xml b/samoa-s4/pom.xml
deleted file mode 100644
index f3e24a3..0000000
--- a/samoa-s4/pom.xml
+++ /dev/null
@@ -1,136 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  #%L
-  SAMOA
-  %%
-  Copyright (C) 2014 - 2015 Apache Software Foundation
-  %%
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-  
-       http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  #L%
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-
-  <name>samoa-s4</name>
-  <description>S4 bindings for SAMOA</description>
-
-  <artifactId>samoa-s4</artifactId>
-  <parent>
-    <groupId>org.apache.samoa</groupId>
-    <artifactId>samoa</artifactId>
-    <version>0.4.0-incubating-SNAPSHOT</version>
-  </parent>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.samoa</groupId>
-      <artifactId>samoa-api</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>com.github.javacliparser</groupId>
-      <artifactId>javacliparser</artifactId>
-      <version>${javacliparser.version}</version>
-    </dependency>
-
-    <!-- S4 dependencies need to be installed separately as they are not 
available via Maven yet -->
-    <dependency>
-      <groupId>org.apache.s4</groupId>
-      <artifactId>s4-base</artifactId>
-      <version>${s4.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.s4</groupId>
-      <artifactId>s4-comm</artifactId>
-      <version>${s4.version}</version>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.s4</groupId>
-      <artifactId>s4-core</artifactId>
-      <version>${s4.version}</version>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-dependency-plugin</artifactId>
-        <version>${maven-dependency-plugin.version}</version>
-        <configuration>
-          <outputDirectory>${project.build.directory}/lib</outputDirectory>
-          <overWriteReleases>false</overWriteReleases>
-          <overWriteSnapshots>false</overWriteSnapshots>
-          <overWriteIfNewer>true</overWriteIfNewer>
-
-          <excludeGroupIds>org.apache.s4</excludeGroupIds>
-          <excludeTransitive>true</excludeTransitive>
-        </configuration>
-        <executions>
-          <execution>
-            <id>copy-dependencies</id>
-            <phase>package</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <!-- SAMOA assembly -->
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <version>${maven-assembly-plugin.version}</version>
-        <configuration>
-          <descriptors>
-            <descriptor>src/main/assembly/samoa-s4.xml</descriptor>
-          </descriptors>
-          <finalName>SAMOA-S4-${project.version}</finalName>
-          <attach>false</attach>
-          <outputDirectory>../target</outputDirectory>
-          <appendAssemblyId>false</appendAssemblyId>
-          <archive>
-            <manifestEntries>
-              <Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>
-              <Bundle-Description>${project.description}</Bundle-Description>
-              
<Implementation-Version>${project.version}</Implementation-Version>
-              <Implementation-Vendor>Yahoo Labs</Implementation-Vendor>
-              <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
-              
<S4-App-Class>org.apache.samoa.topology.impl.S4DoTask</S4-App-Class>
-              <S4-Version>${s4.version}</S4-Version>
-            </manifestEntries>
-          </archive>
-        </configuration>
-        <executions>
-          <execution>
-            <id>make-assembly</id> <!-- this is used for inheritance merges -->
-            <phase>package</phase> <!-- bind to the packaging phase -->
-            <goals>
-              <goal>single</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/samoa-s4-adapter/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-s4/samoa-s4-adapter/pom.xml 
b/samoa-s4/samoa-s4-adapter/pom.xml
deleted file mode 100644
index 5a66a1e..0000000
--- a/samoa-s4/samoa-s4-adapter/pom.xml
+++ /dev/null
@@ -1,54 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  #%L
-  SAMOA
-  %%
-  Copyright (C) 2014 - 2015 Apache Software Foundation
-  %%
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-  
-       http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  #L%
-  -->
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-     http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>samoa-s4-adapter</artifactId>
-  <groupId>org.apache.samoa</groupId>
-  <version>0.1</version>
-  <name>samoa-s4-adapter</name>
-  <description>Adapter module to connect to external stream and also to 
provide entrance processing items for SAMOA</description>
-
-  <dependencies>
-    <dependency>
-      <artifactId>samoa-s4</artifactId>
-      <groupId>org.apache.samoa</groupId>
-      <version>0.1</version>
-    </dependency>
-  </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java
 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java
deleted file mode 100644
index 1634502..0000000
--- 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package samoa.topology.adapter;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.s4.core.adapter.AdapterApp;
-
-import samoa.sandbox.SourceProcessor;
-import samoa.streams.StreamSourceProcessor;
-
-public class S4AdapterApp extends AdapterApp {
-
-       S4EntranceProcessingItem entrancePI;
-       StreamSourceProcessor sourceProcessor;
-       
-       @Override
-       protected void onInit() {
-               entrancePI = new S4EntranceProcessingItem(this);
-               sourceProcessor = new StreamSourceProcessor();
-               entrancePI.setProcessor(sourceProcessor);
-       }
-       
-       @Override
-       protected void onStart() {
-               
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java
 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java
deleted file mode 100644
index 4c22a0b..0000000
--- 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package samoa.topology.adapter;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-
-import samoa.core.Processor;
-import samoa.topology.EntranceProcessingItem;
-import samoa.topology.impl.DoTaskApp;
-import weka.core.Instance;
-
-public class S4EntranceProcessingItem extends ProcessingElement implements 
EntranceProcessingItem {
-
-       private Processor processor;
-       //DoTaskApp app;
-       
-       
-       public S4EntranceProcessingItem(App app){
-               super(app);
-               //this.app = (DoTaskApp) app;
-               this.setSingleton(true);
-               
-       }
-
-       @Override
-       public Processor getProcessor() {
-               return this.processor;
-       }
-
-       @Override
-       public void put(Instance inst) {
-               // do nothing
-               //may not needed
-
-       }
-
-       @Override
-       protected void onCreate() {
-               
-               //              if (this.processor != null){
-//                     this.processor = 
this.processor.newProcessor(this.processor);
-//                     this.processor.onCreate(Integer.parseInt(getId()));
-//             }
-       }
-
-       @Override
-       protected void onRemove() {
-               //do nothing
-               
-       }
-       
-       public void setProcessor(Processor processor){
-               this.processor = processor;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java
 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java
deleted file mode 100644
index 2203a32..0000000
--- 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * 
- */
-/**
- * @author severien
- *
- */
-package samoa.topology.adapter;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/assembly/samoa-s4.xml
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/assembly/samoa-s4.xml 
b/samoa-s4/src/main/assembly/samoa-s4.xml
deleted file mode 100644
index 41a93f5..0000000
--- a/samoa-s4/src/main/assembly/samoa-s4.xml
+++ /dev/null
@@ -1,78 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  #%L
-  SAMOA
-  %%
-  Copyright (C) 2014 - 2015 Apache Software Foundation
-  %%
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-  
-       http://www.apache.org/licenses/LICENSE-2.0
-  
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  #L%
-  -->
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-     http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
-  <id>dist</id>
-  <formats>
-    <format>jar</format>
-  </formats>
-  <includeBaseDirectory>false</includeBaseDirectory>
-
-  <fileSets>
-    <!-- SAMOA API artifacts -->
-    <fileSet>
-      <outputDirectory>lib/</outputDirectory>
-      <directory>../samoa-api/target/lib/</directory>
-      <includes>
-        <include>*</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <outputDirectory>app/</outputDirectory>
-      <directory>../samoa-api/target/</directory>
-      <includes>
-        <include>samoa-api-*.jar</include>
-      </includes>
-    </fileSet>
-
-    <!-- SAMOA S4 artifacts -->
-    <fileSet>
-      <outputDirectory>app/</outputDirectory>
-      <directory>target/</directory>
-      <includes>
-        <include>samoa-s4-*.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
-      <outputDirectory>/</outputDirectory>
-      <directory>target/</directory>
-      <includes>
-        <include>lib/*</include>
-      </includes>
-    </fileSet>
-  </fileSets>
-
-</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java
deleted file mode 100644
index ebd18f9..0000000
--- 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ComponentFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.core.EntranceProcessor;
-import org.apache.samoa.core.Processor;
-import org.apache.samoa.topology.ComponentFactory;
-import org.apache.samoa.topology.EntranceProcessingItem;
-import org.apache.samoa.topology.IProcessingItem;
-import org.apache.samoa.topology.ProcessingItem;
-import org.apache.samoa.topology.Stream;
-import org.apache.samoa.topology.Topology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * S4 Platform Component Factory
- * 
- * @author severien
- * 
- */
-public class S4ComponentFactory implements ComponentFactory {
-
-  public static final Logger logger = 
LoggerFactory.getLogger(S4ComponentFactory.class);
-  protected S4DoTask app;
-
-  @Override
-  public ProcessingItem createPi(Processor processor, int paralellism) {
-    S4ProcessingItem processingItem = new S4ProcessingItem(app);
-    // TODO refactor how to set the paralellism level
-    processingItem.setParalellismLevel(paralellism);
-    processingItem.setProcessor(processor);
-
-    return processingItem;
-  }
-
-  @Override
-  public ProcessingItem createPi(Processor processor) {
-    return this.createPi(processor, 1);
-  }
-
-  @Override
-  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
-    // TODO Create source Entry processing item that connects to an external
-    // stream
-    S4EntranceProcessingItem entrancePi = new 
S4EntranceProcessingItem(entranceProcessor, app);
-    entrancePi.setParallelism(1); // FIXME should not be set to 1 statically
-    return entrancePi;
-  }
-
-  @Override
-  public Stream createStream(IProcessingItem sourcePi) {
-    S4Stream aStream = new S4Stream(app);
-    return aStream;
-  }
-
-  @Override
-  public Topology createTopology(String topoName) {
-    return new S4Topology(topoName);
-  }
-
-  /**
-   * Initialization method.
-   * 
-   * @param evalTask
-   */
-  public void init(String evalTask) {
-    // Task is initiated in the DoTaskApp
-  }
-
-  /**
-   * Sets S4 application.
-   * 
-   * @param app
-   */
-  public void setApp(S4DoTask app) {
-    this.app = app;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java
deleted file mode 100644
index d52e981..0000000
--- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4DoTask.java
+++ /dev/null
@@ -1,268 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * License
- */
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.apache.samoa.core.Globals;
-import org.apache.samoa.tasks.Task;
-import org.apache.samoa.topology.ComponentFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.Option;
-import com.github.javacliparser.ClassOption;
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/*
- * S4 App that runs samoa Tasks
- *
- * */
-
-/**
- * The Class DoTaskApp.
- */
-final public class S4DoTask extends App {
-
-  private final Logger logger = LoggerFactory.getLogger(S4DoTask.class);
-  Task task;
-
-  @Inject
-  @Named("evalTask")
-  public String evalTask;
-
-  public S4DoTask() {
-    super();
-  }
-
-  /** The engine. */
-  protected ComponentFactory componentFactory;
-
-  /**
-   * Gets the factory.
-   * 
-   * @return the factory
-   */
-  public ComponentFactory getFactory() {
-    return componentFactory;
-  }
-
-  /**
-   * Sets the factory.
-   * 
-   * @param factory
-   *          the new factory
-   */
-  public void setFactory(ComponentFactory factory) {
-    this.componentFactory = factory;
-  }
-
-  /*
-   * Build the application
-   * 
-   * @see org.apache.s4.core.App#onInit()
-   */
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#onInit()
-   */
-  @Override
-  protected void onInit() {
-    logger.info("DoTaskApp onInit");
-    // ConsoleReporters prints S4 metrics
-    // MetricsRegistry mr = new MetricsRegistry();
-    //
-    // CsvReporter.enable(new File(System.getProperty("user.home")
-    // + "/monitor/"), 10, TimeUnit.SECONDS);
-    // ConsoleReporter.enable(10, TimeUnit.SECONDS);
-    try {
-      System.err.println();
-      System.err.println(Globals.getWorkbenchInfoString());
-      System.err.println();
-
-    } catch (Exception ex) {
-      ex.printStackTrace();
-    }
-    S4ComponentFactory factory = new S4ComponentFactory();
-    factory.setApp(this);
-
-    // logger.debug("LC {}", lc);
-
-    // task = TaskProvider.getTask(evalTask);
-
-    // EXAMPLE OPTIONS
-    // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K
-    // 5 -N 0.0)
-    // String[] args = new String[] {evalTask,"-l", "Clustream","-g",
-    // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents",
-    // "-K", "5", "-N", "0.0)"};
-    // String[] args = new String[] { evalTask, "-l", "clustream.Clustream",
-    // "-g", "clustream.Clustream", "-i", "100000", "-s",
-    // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" };
-    logger.debug("PARAMETERS {}", evalTask);
-    // params = params.replace(":", " ");
-    List<String> parameters = new ArrayList<String>();
-    // parameters.add(evalTask);
-    try {
-      parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, 
"UTF-8").split(" ")));
-    } catch (UnsupportedEncodingException ex) {
-      ex.printStackTrace();
-    }
-    String[] args = parameters.toArray(new String[0]);
-    Option[] extraOptions = new Option[] {};
-    // build a single string by concatenating cli options
-    StringBuilder cliString = new StringBuilder();
-    for (int i = 0; i < args.length; i++) {
-      cliString.append(" ").append(args[i]);
-    }
-
-    // parse options
-    try {
-      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, extraOptions);
-      task.setFactory(factory);
-      task.init();
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#onStart()
-   */
-  @Override
-  protected void onStart() {
-    logger.info("Starting DoTaskApp... App Partition [{}]", 
this.getPartitionId());
-    // <<<<<<< HEAD Task doesn't have start in latest storm-impl
-    // TODO change the way the app starts
-    // if (this.getPartitionId() == 0)
-    S4Topology s4topology = (S4Topology) getTask().getTopology();
-    S4EntranceProcessingItem epi = (S4EntranceProcessingItem) 
s4topology.getEntranceProcessingItem();
-    while (epi.injectNextEvent())
-      // inject events from the EntrancePI
-      ;
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#onClose()
-   */
-  @Override
-  protected void onClose() {
-    System.out.println("Closing DoTaskApp...");
-
-  }
-
-  /**
-   * Gets the task.
-   * 
-   * @return the task
-   */
-  public Task getTask() {
-    return task;
-  }
-
-  // These methods are protected in App and can not be accessed from outside.
-  // They are
-  // called from parallel classifiers and evaluations. Is there a better way
-  // to do that?
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#createPE(java.lang.Class)
-   */
-  @Override
-  public <T extends ProcessingElement> T createPE(Class<T> type) {
-    return super.createPE(type);
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#createStream(java.lang.String,
-   * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[])
-   */
-  @Override
-  public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> 
finder,
-      ProcessingElement... processingElements) {
-    return super.createStream(name, finder, processingElements);
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#createStream(java.lang.String,
-   * org.apache.s4.core.ProcessingElement[])
-   */
-  @Override
-  public <T extends Event> Stream<T> createStream(String name, 
ProcessingElement... processingElements) {
-    return super.createStream(name, processingElements);
-  }
-
-  // @com.beust.jcommander.Parameters(separators = "=")
-  // class Parameters {
-  //
-  // @Parameter(names={"-lc","-local"}, description="Local clustering method")
-  // private String localClustering;
-  //
-  // @Parameter(names={"-gc","-global"},
-  // description="Global clustering method")
-  // private String globalClustering;
-  //
-  // }
-  //
-  // class ParametersConverter {// implements IStringConverter<String[]> {
-  //
-  //
-  // public String[] convertToArgs(String value) {
-  //
-  // String[] params = value.split(",");
-  // String[] args = new String[params.length*2];
-  // for(int i=0; i<params.length ; i++) {
-  // args[i] = params[i].split("=")[0];
-  // args[i+1] = params[i].split("=")[1];
-  // i++;
-  // }
-  // return args;
-  // }
-  //
-  // }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java
 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java
deleted file mode 100644
index 771cbc8..0000000
--- 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4EntranceProcessingItem.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.core.EntranceProcessor;
-import org.apache.samoa.topology.EntranceProcessingItem;
-import org.apache.samoa.topology.Stream;
-
-// TODO adapt this entrance processing item to connect to external streams so 
the application doesnt need to use an AdapterApp
-
-public class S4EntranceProcessingItem extends ProcessingElement implements 
EntranceProcessingItem {
-
-  private EntranceProcessor entranceProcessor;
-  // private S4DoTask app;
-  private int parallelism;
-  protected Stream outputStream;
-
-  /**
-   * Constructor of an S4 entrance processing item.
-   * 
-   * @param app
-   *          : S4 application
-   */
-  public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App 
app) {
-    super(app);
-    this.entranceProcessor = entranceProcessor;
-    // this.app = (S4DoTask) app;
-    // this.setSingleton(true);
-  }
-
-  public void setParallelism(int parallelism) {
-    this.parallelism = parallelism;
-  }
-
-  public int getParallelism() {
-    return this.parallelism;
-  }
-
-  @Override
-  public EntranceProcessor getProcessor() {
-    return this.entranceProcessor;
-  }
-
-  //
-  // @Override
-  // public void put(Instance inst) {
-  // // do nothing
-  // // may not needed
-  // }
-
-  @Override
-  protected void onCreate() {
-    // was commented
-    if (this.entranceProcessor != null) {
-      // TODO revisit if we need to change it to a clone() call
-      this.entranceProcessor = (EntranceProcessor) 
this.entranceProcessor.newProcessor(this.entranceProcessor);
-      this.entranceProcessor.onCreate(Integer.parseInt(getId()));
-    }
-  }
-
-  @Override
-  protected void onRemove() {
-    // do nothing
-  }
-
-  //
-  // /**
-  // * Sets the entrance processing item processor.
-  // *
-  // * @param processor
-  // */
-  // public void setProcessor(Processor processor) {
-  // this.entranceProcessor = processor;
-  // }
-
-  @Override
-  public void setName(String name) {
-    super.setName(name);
-  }
-
-  @Override
-  public EntranceProcessingItem setOutputStream(Stream stream) {
-    if (this.outputStream != null)
-      throw new IllegalStateException("Output stream for an EntrancePI sohuld 
be initialized only once");
-    this.outputStream = stream;
-    return this;
-  }
-
-  public boolean injectNextEvent() {
-    if (entranceProcessor.hasNext()) {
-      ContentEvent nextEvent = this.entranceProcessor.nextEvent();
-      outputStream.put(nextEvent);
-      return entranceProcessor.hasNext();
-    } else
-      return false;
-    // return !nextEvent.isLastEvent();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java
----------------------------------------------------------------------
diff --git a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java
deleted file mode 100644
index 154715b..0000000
--- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Event.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * License
- */
-
-import net.jcip.annotations.Immutable;
-
-import org.apache.s4.base.Event;
-import org.apache.samoa.core.ContentEvent;
-
-/**
- * The Class InstanceEvent.
- */
-@Immutable
-final public class S4Event extends Event {
-
-  private String key;
-
-  public String getKey() {
-    return key;
-  }
-
-  public void setKey(String key) {
-    this.key = key;
-  }
-
-  /** The content event. */
-  private ContentEvent contentEvent;
-
-  /**
-   * Instantiates a new instance event.
-   */
-  public S4Event() {
-    // Needed for serialization of kryo
-  }
-
-  /**
-   * Instantiates a new instance event.
-   * 
-   * @param contentEvent
-   *          the content event
-   */
-  public S4Event(ContentEvent contentEvent) {
-    if (contentEvent != null) {
-      this.contentEvent = contentEvent;
-      this.key = contentEvent.getKey();
-
-    }
-  }
-
-  /**
-   * Gets the content event.
-   * 
-   * @return the content event
-   */
-  public ContentEvent getContentEvent() {
-    return contentEvent;
-  }
-
-  /**
-   * Sets the content event.
-   * 
-   * @param contentEvent
-   *          the new content event
-   */
-  public void setContentEvent(ContentEvent contentEvent) {
-    this.contentEvent = contentEvent;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java
deleted file mode 100644
index b9c7467..0000000
--- 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4ProcessingItem.java
+++ /dev/null
@@ -1,186 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.samoa.core.Processor;
-import org.apache.samoa.topology.ProcessingItem;
-import org.apache.samoa.topology.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * S4 Platform platform specific processing item, inherits from S4 
ProcessinElemnt.
- * 
- * @author severien
- * 
- */
-public class S4ProcessingItem extends ProcessingElement implements
-    ProcessingItem {
-
-  public static final Logger logger = LoggerFactory
-      .getLogger(S4ProcessingItem.class);
-
-  private Processor processor;
-  private int paralellismLevel;
-  private S4DoTask app;
-
-  private static final String NAME = "PROCESSING-ITEM-";
-  private static int OBJ_COUNTER = 0;
-
-  /**
-   * Constructor of S4 ProcessingItem.
-   * 
-   * @param app
-   *          : S4 application
-   */
-  public S4ProcessingItem(App app) {
-    super(app);
-    super.setName(NAME + OBJ_COUNTER);
-    OBJ_COUNTER++;
-    this.app = (S4DoTask) app;
-    this.paralellismLevel = 1;
-  }
-
-  @Override
-  public String getName() {
-    return super.getName();
-  }
-
-  /**
-   * Gets processing item paralellism level.
-   * 
-   * @return int
-   */
-  public int getParalellismLevel() {
-    return paralellismLevel;
-  }
-
-  /**
-   * Sets processing item paralellism level.
-   * 
-   * @param paralellismLevel
-   */
-  public void setParalellismLevel(int paralellismLevel) {
-    this.paralellismLevel = paralellismLevel;
-  }
-
-  /**
-   * onEvent method.
-   * 
-   * @param event
-   */
-  public void onEvent(S4Event event) {
-    if (processor.process(event.getContentEvent()) == true) {
-      close();
-    }
-  }
-
-  /**
-   * Sets S4 processing item processor.
-   * 
-   * @param processor
-   */
-  public void setProcessor(Processor processor) {
-    this.processor = processor;
-  }
-
-  // Methods from ProcessingItem
-  @Override
-  public Processor getProcessor() {
-    return processor;
-  }
-
-  /**
-   * KeyFinder sets the keys for a specific event.
-   * 
-   * @return KeyFinder
-   */
-  private KeyFinder<S4Event> getKeyFinder() {
-    KeyFinder<S4Event> keyFinder = new KeyFinder<S4Event>() {
-      @Override
-      public List<String> get(S4Event s4event) {
-        List<String> results = new ArrayList<String>();
-        results.add(s4event.getKey());
-        return results;
-      }
-    };
-
-    return keyFinder;
-  }
-
-  @Override
-  public ProcessingItem connectInputAllStream(Stream inputStream) {
-
-    S4Stream stream = (S4Stream) inputStream;
-    stream.setParallelism(this.paralellismLevel);
-    stream.addStream(inputStream.getStreamId(),
-        getKeyFinder(), this, S4Stream.BROADCAST);
-    return this;
-  }
-
-  @Override
-  public ProcessingItem connectInputKeyStream(Stream inputStream) {
-
-    S4Stream stream = (S4Stream) inputStream;
-    stream.setParallelism(this.paralellismLevel);
-    stream.addStream(inputStream.getStreamId(),
-        getKeyFinder(), this, S4Stream.GROUP_BY_KEY);
-
-    return this;
-  }
-
-  @Override
-  public ProcessingItem connectInputShuffleStream(Stream inputStream) {
-    S4Stream stream = (S4Stream) inputStream;
-    stream.setParallelism(this.paralellismLevel);
-    stream.addStream(inputStream.getStreamId(),
-        getKeyFinder(), this, S4Stream.SHUFFLE);
-
-    return this;
-  }
-
-  // Methods from ProcessingElement
-  @Override
-  protected void onCreate() {
-    logger.debug("PE ID {}", getId());
-    if (this.processor != null) {
-      this.processor = this.processor.newProcessor(this.processor);
-      this.processor.onCreate(Integer.parseInt(getId()));
-    }
-  }
-
-  @Override
-  protected void onRemove() {
-    // do nothing
-  }
-
-  @Override
-  public int getParallelism() {
-    return this.paralellismLevel;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java
deleted file mode 100644
index 734462e..0000000
--- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Stream.java
+++ /dev/null
@@ -1,184 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.s4.base.KeyFinder;
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.topology.AbstractStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * S4 Platform specific stream.
- * 
- * @author severien
- * 
- */
-public class S4Stream extends AbstractStream {
-
-  public static final int SHUFFLE = 0;
-  public static final int GROUP_BY_KEY = 1;
-  public static final int BROADCAST = 2;
-
-  private static final Logger logger = LoggerFactory.getLogger(S4Stream.class);
-
-  private S4DoTask app;
-  private int processingItemParalellism;
-  private int shuffleCounter;
-
-  private static final String NAME = "STREAM-";
-  private static int OBJ_COUNTER = 0;
-
-  /* The stream list */
-  public List<StreamType> streams;
-
-  public S4Stream(S4DoTask app) {
-    super();
-    this.app = app;
-    this.processingItemParalellism = 1;
-    this.shuffleCounter = 0;
-    this.streams = new ArrayList<StreamType>();
-    this.setStreamId(NAME + OBJ_COUNTER);
-    OBJ_COUNTER++;
-  }
-
-  public S4Stream(S4DoTask app, S4ProcessingItem pi) {
-    super();
-    this.app = app;
-    this.processingItemParalellism = 1;
-    this.shuffleCounter = 0;
-    this.streams = new ArrayList<StreamType>();
-    this.setStreamId(NAME + OBJ_COUNTER);
-    OBJ_COUNTER++;
-
-  }
-
-  /**
-   * 
-   * @return
-   */
-  public int getParallelism() {
-    return processingItemParalellism;
-  }
-
-  public void setParallelism(int parallelism) {
-    this.processingItemParalellism = parallelism;
-  }
-
-  public void addStream(String streamID, KeyFinder<S4Event> finder,
-      S4ProcessingItem pi, int type) {
-    String streamName = streamID + "_" + pi.getName();
-    org.apache.s4.core.Stream<S4Event> stream = this.app.createStream(
-        streamName, pi);
-    stream.setName(streamName);
-    logger.debug("Stream name S4Stream {}", streamName);
-    if (finder != null)
-      stream.setKey(finder);
-    this.streams.add(new StreamType(stream, type));
-
-  }
-
-  @Override
-  public void put(ContentEvent event) {
-
-    for (int i = 0; i < streams.size(); i++) {
-
-      switch (streams.get(i).getType()) {
-      case SHUFFLE:
-        S4Event s4event = new S4Event(event);
-        s4event.setStreamId(streams.get(i).getStream().getName());
-        if (getParallelism() == 1) {
-          s4event.setKey("0");
-        } else {
-          s4event.setKey(Integer.toString(shuffleCounter));
-        }
-        streams.get(i).getStream().put(s4event);
-        shuffleCounter++;
-        if (shuffleCounter >= (getParallelism())) {
-          shuffleCounter = 0;
-        }
-
-        break;
-
-      case GROUP_BY_KEY:
-        S4Event s4event1 = new S4Event(event);
-        s4event1.setStreamId(streams.get(i).getStream().getName());
-        HashCodeBuilder hb = new HashCodeBuilder();
-        hb.append(event.getKey());
-        String key = Integer.toString(hb.build() % getParallelism());
-        s4event1.setKey(key);
-        streams.get(i).getStream().put(s4event1);
-        break;
-
-      case BROADCAST:
-        for (int p = 0; p < this.getParallelism(); p++) {
-          S4Event s4event2 = new S4Event(event);
-          s4event2.setStreamId(streams.get(i).getStream().getName());
-          s4event2.setKey(Integer.toString(p));
-          streams.get(i).getStream().put(s4event2);
-        }
-        break;
-
-      default:
-        break;
-      }
-
-    }
-
-  }
-
-  /**
-   * Subclass for definig stream connection type
-   * 
-   * @author severien
-   * 
-   */
-  class StreamType {
-    org.apache.s4.core.Stream<S4Event> stream;
-    int type;
-
-    public StreamType(org.apache.s4.core.Stream<S4Event> s, int t) {
-      this.stream = s;
-      this.type = t;
-    }
-
-    public org.apache.s4.core.Stream<S4Event> getStream() {
-      return stream;
-    }
-
-    public void setStream(org.apache.s4.core.Stream<S4Event> stream) {
-      this.stream = stream;
-    }
-
-    public int getType() {
-      return type;
-    }
-
-    public void setType(int type) {
-      this.type = type;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java
deleted file mode 100644
index 22807a6..0000000
--- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Submitter.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.File;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.s4.core.util.AppConfig;
-import org.apache.s4.core.util.ParsingUtils;
-import org.apache.s4.deploy.DeploymentUtils;
-import org.apache.samoa.tasks.Task;
-import org.apache.samoa.topology.ISubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-
-public class S4Submitter implements ISubmitter {
-
-  private static Logger logger = LoggerFactory.getLogger(S4Submitter.class);
-
-  @Override
-  public void deployTask(Task task) {
-    // TODO: Get application FROM HTTP server
-    // TODO: Initializa a http server to serve the app package
-
-    String appURIString = null;
-    // File app = new File(System.getProperty("user.dir")
-    // + "/src/site/dist/SAMOA-S4-0.1-dist.jar");
-
-    // TODO: String app url http://localhost:8000/SAMOA-S4-0.1-dist.jar
-    try {
-      URL appURL = new URL("http://localhost:8000/SAMOA-S4-0.1.jar";);
-      appURIString = appURL.toString();
-    } catch (MalformedURLException e1) {
-      e1.printStackTrace();
-    }
-
-    // try {
-    // appURIString = app.toURI().toURL().toString();
-    // } catch (MalformedURLException e) {
-    // e.printStackTrace();
-    // }
-    if (task == null) {
-      logger.error("Can't execute since evaluation task is not set!");
-      return;
-    } else {
-      logger.info("Deploying SAMOA S4 task [{}] from location [{}]. ",
-          task.getClass().getSimpleName(), appURIString);
-    }
-
-    String[] args = { "-c=testCluster2",
-        "-appClass=" + S4DoTask.class.getName(),
-        "-appName=" + "samoaApp",
-        "-p=evalTask=" + task.getClass().getSimpleName(),
-        "-zk=localhost:2181", "-s4r=" + appURIString, "-emc=" + 
SamoaSerializerModule.class.getName() };
-    // "-emc=" + S4MOAModule.class.getName(),
-    // "@" +
-    // Resources.getResource("s4moa.properties").getFile(),
-
-    S4Config s4config = new S4Config();
-    JCommander jc = new JCommander(s4config);
-    jc.parse(args);
-
-    Map<String, String> namedParameters = new HashMap<String, String>();
-    for (String parameter : s4config.namedParameters) {
-      String[] param = parameter.split("=");
-      namedParameters.put(param[0], param[1]);
-    }
-
-    AppConfig config = new AppConfig.Builder()
-        .appClassName(s4config.appClass).appName(s4config.appName)
-        .appURI(s4config.appURI).namedParameters(namedParameters)
-        .build();
-
-    DeploymentUtils.initAppConfig(config, s4config.clusterName, true,
-        s4config.zkString);
-
-    System.out.println("Suposedly deployed on S4");
-  }
-
-  public void initHTTPServer() {
-
-  }
-
-  @Parameters(separators = "=")
-  public static class S4Config {
-
-    @Parameter(names = { "-c", "-cluster" }, description = "Cluster name", 
required = true)
-    String clusterName = null;
-
-    @Parameter(names = "-appClass", description = "Main App class", required = 
false)
-    String appClass = null;
-
-    @Parameter(names = "-appName", description = "Application name", required 
= false)
-    String appName = null;
-
-    @Parameter(names = "-s4r", description = "Application URI", required = 
false)
-    String appURI = null;
-
-    @Parameter(names = "-zk", description = "ZooKeeper connection string", 
required = false)
-    String zkString = null;
-
-    @Parameter(names = { "-extraModulesClasses", "-emc" }, description = 
"Comma-separated list of additional configuration modules (they will be 
instantiated through their constructor without arguments).", required = false)
-    List<String> extraModules = new ArrayList<String>();
-
-    @Parameter(names = { "-p", "-namedStringParameters" }, description = 
"Comma-separated list of inline configuration "
-        + "parameters, taking precedence over homonymous configuration 
parameters from configuration files. "
-        + "Syntax: '-p=name1=value1,name2=value2 '", required = false, 
converter = ParsingUtils.InlineConfigParameterConverter.class)
-    List<String> namedParameters = new ArrayList<String>();
-
-  }
-
-  @Override
-  public void setLocal(boolean bool) {
-    // TODO S4 works the same for local and distributed environments
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java
deleted file mode 100644
index 413cfda..0000000
--- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/S4Topology.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.samoa.topology.AbstractTopology;
-import org.apache.samoa.topology.EntranceProcessingItem;
-
-public class S4Topology extends AbstractTopology {
-
-  // CASEY: it seems evaluationTask is not used.
-  // Remove it for now
-
-  // private String _evaluationTask;
-
-  // S4Topology(String topoName, String evalTask) {
-  // super(topoName);
-  // }
-  //
-  // S4Topology(String topoName) {
-  // this(topoName, null);
-  // }
-
-  // @Override
-  // public void setEvaluationTask(String evalTask) {
-  // _evaluationTask = evalTask;
-  // }
-  //
-  // @Override
-  // public String getEvaluationTask() {
-  // return _evaluationTask;
-  // }
-
-  S4Topology(String topoName) {
-    super(topoName);
-  }
-
-  protected EntranceProcessingItem getEntranceProcessingItem() {
-    if (this.getEntranceProcessingItems() == null)
-      return null;
-    if (this.getEntranceProcessingItems().size() < 1)
-      return null;
-    // TODO: support multiple entrance PIs
-    return (EntranceProcessingItem) 
this.getEntranceProcessingItems().toArray()[0];
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java
deleted file mode 100644
index 9f9f144..0000000
--- a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializer.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.nio.ByteBuffer;
-
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.samoa.learners.classifiers.trees.AttributeContentEvent;
-import org.apache.samoa.learners.classifiers.trees.ComputeContentEvent;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-public class SamoaSerializer implements SerializerDeserializer {
-
-  private ThreadLocal<Kryo> kryoThreadLocal;
-  private ThreadLocal<Output> outputThreadLocal;
-
-  private int initialBufferSize = 2048;
-  private int maxBufferSize = 256 * 1024;
-
-  public void setMaxBufferSize(int maxBufferSize) {
-    this.maxBufferSize = maxBufferSize;
-  }
-
-  /**
-   * 
-   * @param classLoader
-   *          classloader able to handle classes to serialize/deserialize. For 
instance, application-level events can
-   *          only be handled by the application classloader.
-   */
-  @Inject
-  public SamoaSerializer(@Assisted final ClassLoader classLoader) {
-    kryoThreadLocal = new ThreadLocal<Kryo>() {
-
-      @Override
-      protected Kryo initialValue() {
-        Kryo kryo = new Kryo();
-        kryo.setClassLoader(classLoader);
-        kryo.register(AttributeContentEvent.class, new 
AttributeContentEvent.AttributeCEFullPrecSerializer());
-        kryo.register(ComputeContentEvent.class, new 
ComputeContentEvent.ComputeCEFullPrecSerializer());
-        kryo.setRegistrationRequired(false);
-        return kryo;
-      }
-    };
-
-    outputThreadLocal = new ThreadLocal<Output>() {
-      @Override
-      protected Output initialValue() {
-        Output output = new Output(initialBufferSize, maxBufferSize);
-        return output;
-      }
-    };
-
-  }
-
-  @Override
-  public Object deserialize(ByteBuffer rawMessage) {
-    Input input = new Input(rawMessage.array());
-    try {
-      return kryoThreadLocal.get().readClassAndObject(input);
-    } finally {
-      input.close();
-    }
-  }
-
-  @SuppressWarnings("resource")
-  @Override
-  public ByteBuffer serialize(Object message) {
-    Output output = outputThreadLocal.get();
-    try {
-      kryoThreadLocal.get().writeClassAndObject(output, message);
-      return ByteBuffer.wrap(output.toBytes());
-    } finally {
-      output.clear();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/de050f99/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java
 
b/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java
deleted file mode 100644
index e530a09..0000000
--- 
a/samoa-s4/src/main/java/org/apache/samoa/topology/impl/SamoaSerializerModule.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.apache.s4.base.SerializerDeserializer;
-
-import com.google.inject.AbstractModule;
-
-public class SamoaSerializerModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    bind(SerializerDeserializer.class).to(SamoaSerializer.class);
-
-  }
-
-}

Reply via email to