Repository: incubator-samoa
Updated Branches:
  refs/heads/master 2c26c0465 -> 1c858e1e6


SAMOA-49: Added adapter for Apache Apex


Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/1c858e1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/1c858e1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/1c858e1e

Branch: refs/heads/master
Commit: 1c858e1e60efa747f872e664c866a63fb2facacf
Parents: 2c26c04
Author: bhupeshchawda <[email protected]>
Authored: Tue Sep 22 16:34:14 2015 +0530
Committer: bhupeshchawda <[email protected]>
Committed: Fri Nov 4 15:42:44 2016 +0530

----------------------------------------------------------------------
 README.md                                       |   9 ++
 bin/samoa                                       |  78 +++++++++
 bin/samoa-apex.properties                       |  53 ++++++
 pom.xml                                         |  12 ++
 samoa-apex/README.md                            |  64 ++++++++
 samoa-apex/pom.xml                              | 160 +++++++++++++++++++
 .../java/org/apache/samoa/apex/ApexDoTask.java  |  75 +++++++++
 .../org/apache/samoa/apex/ApexSamoaUtils.java   |  62 +++++++
 .../org/apache/samoa/apex/LocalApexDoTask.java  |  65 ++++++++
 .../apache/samoa/apex/StreamingAppFactory.java  |  52 ++++++
 .../topology/impl/ApexComponentFactory.java     |  91 +++++++++++
 .../impl/ApexEntranceProcessingItem.java        |  69 ++++++++
 .../apex/topology/impl/ApexInputOperator.java   |  91 +++++++++++
 .../samoa/apex/topology/impl/ApexOperator.java  | 158 ++++++++++++++++++
 .../apex/topology/impl/ApexProcessingItem.java  | 113 +++++++++++++
 .../samoa/apex/topology/impl/ApexStream.java    |  57 +++++++
 .../apex/topology/impl/ApexStreamUtils.java     | 135 ++++++++++++++++
 .../samoa/apex/topology/impl/ApexTask.java      | 151 +++++++++++++++++
 .../samoa/apex/topology/impl/ApexTopology.java  |  71 ++++++++
 .../apex/topology/impl/ApexTopologyNode.java    |  38 +++++
 .../impl/DefaultInputPortSerializable.java      |  31 ++++
 .../impl/DefaultOutputPortSerializable.java     |  31 ++++
 .../impl/DelayOperatorSerializable.java         |  60 +++++++
 samoa-apex/src/main/resources/log4j.properties  |  42 +++++
 .../org/apache/samoa/apex/AlgosTestApex.java    |  68 ++++++++
 samoa-apex/src/test/resources/log4j.properties  |  42 +++++
 .../trees/ModelAggregatorProcessor.java         |   5 +-
 .../apache/samoa/streams/ArffFileStream.java    |   2 +-
 28 files changed, 1882 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 4229864..d73860f 100644
--- a/README.md
+++ b/README.md
@@ -56,6 +56,15 @@ cd incubator-samoa
 mvn -Ps4 package
 ```
 
+###Apex mode
+
+Simply clone the repository and and create SAMOA with Apex package.
+```bash
+git clone http://git.apache.org/incubator-samoa.git
+cd incubator-samoa
+mvn -Papex package
+```
+
 ###Local mode
 
 If you want to test SAMOA in a local environment, simply clone the repository 
and install SAMOA.

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/bin/samoa
----------------------------------------------------------------------
diff --git a/bin/samoa b/bin/samoa
index 5f41ce6..5e22eae 100755
--- a/bin/samoa
+++ b/bin/samoa
@@ -344,6 +344,84 @@ elif [ $PLATFORM = 'FLINK' ]; then
                 $FLINK_EXEC run $DEPLOYABLE $COMPLETE_ARG
         fi                                                    
 
+elif [ $PLATFORM = 'APEX' ]; then
+
+    if [ ! -f $2 ];then
+        echo "$2 does not exist!"
+        echo "Please use a valid jar file for Apex execution"
+        exit -1
+    fi
+
+    VALUE=""
+    getvalue()
+    {
+        VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_APEX_PROPERTIES | grep "$1" | 
tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
+    }
+
+    SAMOA_APEX_PROPERTIES="samoa-apex.properties"
+    MODE_OPTION="samoa.apex.mode"
+    DT_DFS_DIR_OPTION="dt.dfsRootDirectory"
+    DT_SITE_OPTION="dt.site.path"
+    DEFAULT_FS_OPTION="fs.defaultFS"
+    RM_OPTION="yarn.resourcemanager.address"
+    HADOOP_HDFS_HOME_OPTION="hadoop.hdfs.home"
+    HADOOP_YARN_HOME_OPTION="hadoop.yarn.home"
+    HADOOP_COMMON_HOME_OPTION="hadoop.common.home"
+    HADOOP_CONF_DIR_OPTION="hadoop.conf.dir"
+
+    getvalue "$MODE_OPTION"
+    MODE_ARG="$VALUE"
+
+    if [[ "$MODE_ARG" = "cluster" && -z $HADOOP_HOME ]];then
+        echo "HADOOP_HOME is not set!"
+        echo "Please set HADOOP_HOME to point to your Hadoop installation"
+        exit -1
+    fi
+
+    getvalue "$DT_DFS_DIR_OPTION"
+    DT_DFS_DIR="$VALUE"
+
+    getvalue "$DEFAULT_FS_OPTION"
+    DEFAULT_FS="$VALUE"
+
+    getvalue "$DT_SITE_OPTION"
+    DT_SITE=`echo $(eval echo "$VALUE")`
+
+    getvalue "$RM_OPTION"
+    RM="$VALUE"
+
+    getvalue "$HADOOP_HDFS_HOME_OPTION"
+    HADOOP_HDFS_HOME=`echo $(eval echo "$VALUE")`
+
+    getvalue "$HADOOP_YARN_HOME_OPTION"
+    HADOOP_YARN_HOME=`echo $(eval echo "$VALUE")`
+
+    getvalue "$HADOOP_COMMON_HOME_OPTION"
+    HADOOP_COMMON_HOME=`echo $(eval echo "$VALUE")`
+
+    getvalue "$HADOOP_CONF_DIR_OPTION"
+    HADOOP_CONF_DIR=`echo $(eval echo "$VALUE")`
+
+    DEPLOYABLE=$JAR_PATH
+    echo "Deployable: " $DEPLOYABLE
+
+    #LIB_PREFIX=/opt/cloudera/parcels/CDH/lib/
+    
YARN_CLASSPATH=$HADOOP_HDFS_HOME/*:$HADOOP_YARN_HOME/*:$HADOOP_COMMON_HOME/*:$HADOOP_CONF_DIR
+    echo "YARN CLASSPATH: " $YARN_CLASSPATH
+
+    if [ "$MODE_ARG" = "cluster" ]; then
+        CMD="java -cp .:$YARN_CLASSPATH:$DEPLOYABLE 
-D$DT_DFS_DIR_OPTION=$DT_DFS_DIR -D$DEFAULT_FS_OPTION=$DEFAULT_FS 
-D$DT_SITE_OPTION=$DT_SITE -D$RM_OPTION=$RM org.apache.samoa.apex.ApexDoTask 
${OPTIONS}"
+        echo Command: $CMD
+        ${CMD}
+    elif [ "$MODE_ARG" = "local" ]
+    then
+        CMD="java -cp .:$DEPLOYABLE -D$DT_SITE_OPTION=$DT_SITE 
org.apache.samoa.apex.LocalApexDoTask ${OPTIONS}"
+        echo Command: $CMD
+        ${CMD}
+    else
+        echo "Wrong mode argument. Check property samoa.apex.mode in 
samoa-apex.properties file"
+    fi
+
 elif [ $PLATFORM = 'THREADS' ]; then
        
        echo "Deploying to LOCAL with MULTITHREADING."

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/bin/samoa-apex.properties
----------------------------------------------------------------------
diff --git a/bin/samoa-apex.properties b/bin/samoa-apex.properties
new file mode 100644
index 0000000..9e12f31
--- /dev/null
+++ b/bin/samoa-apex.properties
@@ -0,0 +1,53 @@
+###
+# #%L
+# SAMOA
+# %%
+# Copyright (C) 2014 - 2015 Apache Software Foundation
+# %%
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# #L%
+###
+
+### Sets the mode of execution for Apex ###
+# "local" - Execute in local mode. No cluster setup needed.
+# "cluster" - Execute in cluster mode. Needs a working Hadoop 2.0 (YARN) 
cluster.
+samoa.apex.mode=local
+#samoa.apex.mode=cluster
+
+### The HDFS directory to use (in case of "cluster" mode) for storing app 
related data ###
+dt.dfsRootDirectory=/user/apex
+
+### The HDFS file system (In case of "cluster" mode) ###
+fs.defaultFS=hdfs://localhost:9000
+
+### The YARN resource manager address (In case of "cluster" mode) ###
+yarn.resourcemanager.address=localhost:8032
+
+### Path to hadoop common libs (In case of "cluster" mode) ###
+# Set this appropriately to the lib directory of hadoop-common if using a 
custom hadoop distribution
+hadoop.common.home=$HADOOP_HOME/share/hadoop/common
+
+### Path to hdfs libs (In case of "cluster" mode) ###
+# Set this appropriately to the lib directory of hadoop-hdfs if using a custom 
hadoop distribution
+hadoop.hdfs.home=$HADOOP_HOME/share/hadoop/hdfs
+
+### Path to yarn libs (In case of "cluster" mode) ###
+# Set this appropriately to the lib directory of hadoop-yarn if using a custom 
hadoop distribution
+hadoop.yarn.home=$HADOOP_HOME/share/hadoop/yarn
+
+### Path to hadoop conf dir (In case of "cluster" mode) ###
+# Set this appropriately to the conf directory of hadoop if using a custom 
hadoop distribution
+hadoop.conf.dir=$HADOOP_HOME/etc/hadoop
+
+### Additional configuration properties, if needed, can be specified in this 
file (In case of "cluster" mode) ###
+dt.site.path=$HOME/.dt/dt-site.xml

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 963d9f8..ecc713d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,15 @@
             </modules>
         </profile>
         <profile>
+            <id>apex</id>
+            <modules>
+                <module>samoa-instances</module>
+                <module>samoa-api</module>
+                <module>samoa-apex</module>
+                <module>samoa-test</module>
+            </modules>
+        </profile>
+        <profile>
             <id>flink</id>
             <modules>
                 <module>samoa-instances</module>
@@ -101,6 +110,7 @@
                 <module>samoa-local</module>
                 <module>samoa-threads</module>
                 <module>samoa-storm</module>
+                <module>samoa-apex</module>
                 <module>samoa-flink</module>
                 <module>samoa-samza</module>
                 <module>samoa-test</module>
@@ -127,6 +137,7 @@
         <miniball.version>1.0.3</miniball.version>
         <samza.version>0.7.0</samza.version>
         <flink.version>0.10.1</flink.version>
+        <apex.version>3.4.0</apex.version>
         <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
         <slf4j-simple.version>1.7.5</slf4j-simple.version>
         <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>
@@ -209,6 +220,7 @@
                         <root>samoa-local</root>
                         <root>samoa-storm</root>
                         <root>samoa-flink</root>
+                        <root>samoa-apex</root>
                         <root>samoa-samza</root>
                         <root>samoa-test</root>
                         <root>samoa-threads</root>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/README.md
----------------------------------------------------------------------
diff --git a/samoa-apex/README.md b/samoa-apex/README.md
new file mode 100644
index 0000000..7316e4d
--- /dev/null
+++ b/samoa-apex/README.md
@@ -0,0 +1,64 @@
+# Execution Instructions
+
+This explains the build and run instructions for Samoa on Apache Apex 
(http://apex.apache.org/)
+
+## Build
+
+Simply clone the repository and and create SAMOA with Apex package.
+```bash
+git clone http://git.apache.org/incubator-samoa.git
+cd incubator-samoa
+mvn -Papex package
+```
+
+The deployable jar will be present in 
`target/SAMOA-Apex-0.4.0-incubating-SNAPSHOT.jar`.
+
+## Running Samoa Algorithms on Apex in Local mode
+- Edit samoa-apex.properties. Make sure `samoa.apex.mode` is set to `local`
+- Run the deployable jar from the top level Samoa directory using the 
parameters for the algorithm. For example, for running the VHT classifier, run: 
`bin/samoa apex target/SAMOA-Apex-0.4.0-incubating-SNAPSHOT.jar 
"PrequentialEvaluation -d /tmp/dump.csv -i 500000 -l 
(classifiers.trees.VerticalHoeffdingTree -p 1) -s 
(generators.RandomTreeGenerator -c 2 -o 5 -u 5)"`
+
+## Running Samoa Algorithms on Apex in Cluster mode
+- A running Hadoop 2.0 (YARN) cluster is necessary for running Apex in cluster 
mode.
+- Edit samoa-apex.properties.
+ - Make sure `samoa.apex.mode` is set to `cluster`
+ - Set the `dt.dfsRootDirectory` parameter to point to a valid HDFS directory
+ - Set the `fs.default.name` parameter to point to the name node service of 
the Hadoop cluster
+- Run the deployable jar from the top level Samoa directory using the 
parameters for the algorithm. For example, for running the VHT classifier, run: 
`bin/samoa apex target/SAMOA-Apex-0.4.0-incubating-SNAPSHOT.jar 
"PrequentialEvaluation -d /tmp/dump.csv -i 500000 -l 
(classifiers.trees.VerticalHoeffdingTree -p 1) -s 
(generators.RandomTreeGenerator -c 2 -o 5 -u 5)"`
+
+## Notes on configuration
+- Apex being a windowed stream processing engine, has certain limitations on 
the number of tuples that can be processed per second. This might require the 
user to limit the speed of tuples being sent by the Entrance Processing Item. 
The user can set the number of tuples per window (`tuplesLimitPerWindow` 
parameter) through the `dt-site.xml` config file. The path to this file needs 
to be set in `samoa-apex.properties` file as `dt.site.path`. Following is an 
example configuration which limits the speed to `2000` tuples per window. For 
Apex, with default steaming window size of `500` ms, this amounts to `4000` 
tuples per second speed.
+This configuration parameter is specific to the type of data and the type of 
topology run and may not be optimal for all the topologies. A useful guide on 
choosing this parameter is to check if the latency of the operators in the Dag 
stays within a acceptable range. If not, the operator is not able to handle 
this load and this parameter must be decreased.
+
+ ```
+ <property>
+   <name>dt.operator.*.prop.tuplesLimitPerWindow</name>
+   <value>2000</value>
+ </property>
+ ```
+ This will change the limit of all the operators present in the Dag. However, 
this will affect only input operators as other operators do not have this 
property. This is convenient as we don't need to know the name of the input 
operator corresponding to the Entrance Processing Item in the topology.
+
+- Apart from the above configuration, Apex engine is highly configurable 
externally via the `dt-site.xml` file which is specified as the `dt.site.path` 
property in `samoa-apex.properties` file. Some of attributes which can be 
modified are as follows
+ 1. Memory allocated to an operator - `MEMORY_MB` 
http://docs.datatorrent.com/beginner/#allocating-operator-memory
+ 2. Size of the streaming window - `STREAMING_WINDOW_SIZE_MILLIS` 
http://docs.datatorrent.com/tutorials/topnwords-c7/#streaming-windows-and-application-windows
+ 3. Checkpoint window size - `CHECKPOINT_WINDOW_COUNT` 
https://apex.apache.org/docs/apex/application_development/#checkpointing
+
+   Please refer the following for more information on what attributes can be 
specified externally and their impact on the processing engine. *However, note 
that most of these would not be applicable to applications running on Samoa, as 
the topology and its properties are already defined specified by Samoa. The 
Apex runner is for running the topology by converting it to an Apex Dag.*
+   - 
https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.DAGContext.html
+   - 
https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Context.OperatorContext.html
+
+- To enable debug logging, add the following configuration to the 
`dt-site.xml` file specified at location in `dt.site.path`
+ ```
+ <property>
+   <name>dt.loggers.level</name>
+   <value>org.apache.*:DEBUG,com.datatorrent.*:DEBUG</value>
+ </property>
+ ```
+
+ ## Using Apex cli
+The user can view details about any application launched via Apex using the 
cli. The apex-core project must be checked out to some directory.
+Launch the apex cli located at: ```apex-core/engine/src/main/scripts/apex```
+Following can be achieved using the cli
+ - View running apps: ```list-apps```
+ - View app info:
+   - Connect to the running application using : ```connect <app-id>```
+   - Run ```get-app-info <app id>```

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-apex/pom.xml b/samoa-apex/pom.xml
new file mode 100644
index 0000000..018ff07
--- /dev/null
+++ b/samoa-apex/pom.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2014 - 2016 Apache Software Foundation
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
+  </properties>
+
+  <name>samoa-apex</name>
+  <description>Apex bindings for SAMOA</description>
+
+  <artifactId>samoa-apex</artifactId>
+  <parent>
+    <groupId>org.apache.samoa</groupId>
+    <artifactId>samoa</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <repositories>
+    <repository>
+      <id>apache.snapshots</id>
+      <name>Apache Development Snapshot Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <releases>
+        <enabled>false</enabled>
+      </releases>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.samoa</groupId>
+      <artifactId>samoa-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.samoa</groupId>
+      <artifactId>samoa-test</artifactId>
+      <type>test-jar</type>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper.storm.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j-log4j12.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.hibernate</groupId>
+      <artifactId>hibernate-validator-annotation-processor</artifactId>
+      <version>4.1.0.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.apex</groupId>
+      <artifactId>apex-engine</artifactId>
+      <version>${apex.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.esotericsoftware.kryo</groupId>
+      <artifactId>kryo</artifactId>
+      <version>2.24.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>log4j-over-slf4j</artifactId>
+      <version>1.7.13</version>
+    </dependency>
+    <dependency>
+      <groupId>com.github.javacliparser</groupId>
+      <artifactId>javacliparser</artifactId>
+      <version>0.5.0</version>
+    </dependency>
+  </dependencies>
+
+
+
+  <build>
+    <plugins>
+      <!-- Apex assembly -->
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>${maven-assembly-plugin.version}</version>
+        <configuration>
+          <finalName>SAMOA-Apex-${project.version}</finalName>
+          <appendAssemblyId>false</appendAssemblyId>
+          <attach>false</attach>
+          <outputDirectory>../target</outputDirectory>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <archive>
+            <manifestEntries>
+              
<!--<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version> -->
+              <Bundle-Description>${project.description}</Bundle-Description>
+              
<Implementation-Version>${project.version}</Implementation-Version>
+              <Implementation-Vendor>Yahoo Labs</Implementation-Vendor>
+              <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+            </manifestEntries>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id> <!-- this is used for inheritance merges -->
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin.version}</version>
+        <configuration>
+          <argLine>-Xmx1G</argLine>
+          <redirectTestOutputToFile>false</redirectTestOutputToFile>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/ApexDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-apex/src/main/java/org/apache/samoa/apex/ApexDoTask.java 
b/samoa-apex/src/main/java/org/apache/samoa/apex/ApexDoTask.java
new file mode 100644
index 0000000..e074a56
--- /dev/null
+++ b/samoa-apex/src/main/java/org/apache/samoa/apex/ApexDoTask.java
@@ -0,0 +1,75 @@
+package org.apache.samoa.apex;
+
+import java.io.File;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.samoa.apex.topology.impl.ApexTask;
+import org.apache.samoa.apex.topology.impl.ApexTopology;
+
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.client.StramAppLauncher;
+
+public class ApexDoTask {
+
+  public static ApexTopology apexTopo;
+
+  public static void main(String[] args) {
+    apexTopo = ApexSamoaUtils.argsToTopology(args);
+    try {
+      startLaunch();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static void startLaunch() throws Exception {
+    ApexTask streamingApp = new ApexTask(apexTopo);
+    launch(streamingApp, "Apex App");
+  }
+
+  public static void launch(StreamingApplication app, String name, String 
libjars) throws Exception {
+    Configuration conf = new Configuration(true);
+//    conf.set("dt.loggers.level", "org.apache.*:DEBUG, 
com.datatorrent.*:DEBUG");
+    conf.set("dt.dfsRootDirectory", System.getProperty("dt.dfsRootDirectory"));
+    conf.set("fs.defaultFS", System.getProperty("fs.defaultFS"));
+    conf.set("yarn.resourcemanager.address", 
System.getProperty("yarn.resourcemanager.address"));
+    conf.addResource(new 
File(System.getProperty("dt.site.path")).toURI().toURL());
+
+    if (libjars != null) {
+      conf.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, libjars);
+    }
+    StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
+    appLauncher.loadDependencies();
+    StreamingAppFactory appFactory = new StreamingAppFactory(app, name);
+    appLauncher.launchApp(appFactory);
+  }
+
+  public static void launch(StreamingApplication app, String name) throws 
Exception {
+    launch(app, name, null);
+  }
+
+  public static ApexTopology getTopology() {
+    return apexTopo;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/ApexSamoaUtils.java
----------------------------------------------------------------------
diff --git a/samoa-apex/src/main/java/org/apache/samoa/apex/ApexSamoaUtils.java 
b/samoa-apex/src/main/java/org/apache/samoa/apex/ApexSamoaUtils.java
new file mode 100644
index 0000000..1371e44
--- /dev/null
+++ b/samoa-apex/src/main/java/org/apache/samoa/apex/ApexSamoaUtils.java
@@ -0,0 +1,62 @@
+package org.apache.samoa.apex;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.ClassOption;
+
+import org.apache.samoa.apex.topology.impl.ApexComponentFactory;
+import org.apache.samoa.apex.topology.impl.ApexTopology;
+import org.apache.samoa.tasks.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ApexSamoaUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ApexSamoaUtils.class);
+
+  public static ApexTopology argsToTopology(String[] args) {
+    StringBuilder cliString = new StringBuilder();
+    for (String arg : args) {
+      cliString.append(" ").append(arg);
+    }
+    logger.debug("Command line string = {}", cliString.toString());
+
+    Task task = getTask(cliString.toString());
+
+    // TODO: remove setFactory method with DynamicBinding
+    task.setFactory(new ApexComponentFactory());
+    task.init();
+
+    return (ApexTopology) task.getTopology();
+  }
+
+  public static Task getTask(String cliString) {
+    Task task = null;
+    try {
+      logger.debug("Providing task [{}]", cliString);
+      task = ClassOption.cliStringToObject(cliString, Task.class, null);
+    } catch (Exception e) {
+      logger.warn("Fail in initializing the task!");
+      e.printStackTrace();
+    }
+    return task;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/LocalApexDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/LocalApexDoTask.java 
b/samoa-apex/src/main/java/org/apache/samoa/apex/LocalApexDoTask.java
new file mode 100644
index 0000000..8077286
--- /dev/null
+++ b/samoa-apex/src/main/java/org/apache/samoa/apex/LocalApexDoTask.java
@@ -0,0 +1,65 @@
+package org.apache.samoa.apex;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.samoa.apex.topology.impl.ApexTask;
+import org.apache.samoa.apex.topology.impl.ApexTopology;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+public class LocalApexDoTask {
+
+  @SuppressWarnings("unused")
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalApexDoTask.class);
+
+  public static void main(String[] args) {
+
+    List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+    args = tmpArgs.toArray(new String[0]);
+
+    ApexTopology apexTopo = ApexSamoaUtils.argsToTopology(args);
+
+    LocalMode lma = LocalMode.newInstance();
+    Configuration conf = new Configuration(false);
+//    conf.set("dt.loggers.level", "org.apache.*:DEBUG");
+
+    try {
+      lma.prepareDAG(new ApexTask(apexTopo), conf);
+      System.out.println("Dag Set in lma: " + lma.getDAG());
+      ((LogicalPlan) lma.getDAG()).validate();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+
+    lc.runAsync();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/StreamingAppFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/StreamingAppFactory.java 
b/samoa-apex/src/main/java/org/apache/samoa/apex/StreamingAppFactory.java
new file mode 100644
index 0000000..b298f8c
--- /dev/null
+++ b/samoa-apex/src/main/java/org/apache/samoa/apex/StreamingAppFactory.java
@@ -0,0 +1,52 @@
+package org.apache.samoa.apex;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.datatorrent.api.StreamingApplication;
+
+import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+public class StreamingAppFactory implements StramAppLauncher.AppFactory {
+ 
+  private StreamingApplication app;
+  private String name;
+
+  public StreamingAppFactory(StreamingApplication app, String name) {
+    this.app = app;
+    this.name = name;
+  }
+
+  public LogicalPlan createApp(LogicalPlanConfiguration planConfig) {
+    LogicalPlan dag = new LogicalPlan();
+    planConfig.prepareDAG(dag, app, getName());
+    return dag;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getDisplayName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexComponentFactory.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexComponentFactory.java
new file mode 100644
index 0000000..446b18f
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexComponentFactory.java
@@ -0,0 +1,91 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+/**
+ * Component factory implementation for samoa-apex
+ */
+public final class ApexComponentFactory implements ComponentFactory {
+
+  private final Map<String, Integer> processorList;
+
+  public ApexComponentFactory() {
+    processorList = new HashMap<>();
+  }
+
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return new ApexProcessingItem(processor, 
this.getComponentName(processor.getClass()), 1);
+  }
+
+  @Override
+  public ProcessingItem createPi(Processor processor, int parallelism) {
+    return new ApexProcessingItem(processor, 
this.getComponentName(processor.getClass()), parallelism);
+  }
+
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+    return new ApexEntranceProcessingItem(processor, 
this.getComponentName(processor.getClass()));
+  }
+
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    ApexTopologyNode apexCompatiblePi = (ApexTopologyNode) sourcePi;
+    ApexStream stream = apexCompatiblePi.createStream();
+    return stream;
+  }
+
+  @Override
+  public Topology createTopology(String topoName) {
+    return new ApexTopology(topoName);
+  }
+
+  private String getComponentName(Class<? extends Processor> clazz) {
+    StringBuilder componentName = new StringBuilder(clazz.getCanonicalName());
+    String key = componentName.toString();
+    Integer index;
+
+    if (!processorList.containsKey(key)) {
+      index = 1;
+    } else {
+      index = processorList.get(key) + 1;
+    }
+
+    processorList.put(key, index);
+
+    componentName.append('_');
+    componentName.append(index);
+
+    return componentName.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexEntranceProcessingItem.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexEntranceProcessingItem.java
new file mode 100644
index 0000000..ee855f0
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexEntranceProcessingItem.java
@@ -0,0 +1,69 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.UUID;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.AbstractEntranceProcessingItem;
+
+/**
+ * EntranceProcessingItem implementation for Apache Apex.
+ */
+public class ApexEntranceProcessingItem extends AbstractEntranceProcessingItem 
implements ApexTopologyNode {
+
+  private final ApexInputOperator inputOperator;
+  private int numStreams;
+
+  public ApexEntranceProcessingItem() {
+    inputOperator = null;
+  }
+
+  // Constructor
+  public ApexEntranceProcessingItem(EntranceProcessor processor) {
+    this(processor, UUID.randomUUID().toString());
+  }
+
+  // Constructor
+  public ApexEntranceProcessingItem(EntranceProcessor processor, String 
friendlyId) {
+    super(processor);
+    this.setName(friendlyId);
+    this.inputOperator = new ApexInputOperator(processor);
+  }
+
+  @Override
+  public void addToTopology(ApexTopology topology, int parallelismHint) {
+    topology.getDAG().addOperator(this.getName(), inputOperator);
+    //add num partitions
+  }
+
+  @Override
+  public ApexStream createStream() {
+    return inputOperator.createStream("Stream_from_" + this.getName() + "_#" + 
numStreams++);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.insert(0, String.format("id: %s, ", this.getName()));
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexInputOperator.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexInputOperator.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexInputOperator.java
new file mode 100644
index 0000000..46c30e3
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexInputOperator.java
@@ -0,0 +1,91 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * Apex Operator
+ *
+ */
+@DefaultSerializer(JavaSerializer.class)
+public class ApexInputOperator extends BaseOperator implements InputOperator, 
Serializable {
+
+  private static final long DEFAULT_LIMIT = 2000;
+  private long tuplesLimitPerWindow; // default
+  private static final long serialVersionUID = 4255026962166445721L;
+  private final EntranceProcessor entranceProcessor;
+  private final DefaultOutputPortSerializable<ContentEvent> outputPort = new 
DefaultOutputPortSerializable<ContentEvent>();
+  private transient int numTuples;
+
+  public ApexInputOperator() {
+    entranceProcessor = null;
+  }
+
+  ApexInputOperator(EntranceProcessor processor) {
+    this.entranceProcessor = processor;
+  }
+
+  ApexStream createStream(String piId) {
+    ApexStream stream = new ApexStream(piId);
+    stream.outputPort = this.outputPort;
+    return stream;
+  }
+
+  @Override
+  public void setup(OperatorContext context) {
+    this.entranceProcessor.onCreate(context.getId());
+    if(tuplesLimitPerWindow == 0) {
+      tuplesLimitPerWindow = DEFAULT_LIMIT;
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId) {
+    super.beginWindow(windowId);
+    numTuples = 0;
+  }
+
+  @Override
+  public void emitTuples() {
+    if (entranceProcessor.hasNext() && numTuples < tuplesLimitPerWindow) {
+      outputPort.emit(entranceProcessor.nextEvent());
+      numTuples++;
+    }
+  }
+
+  public long getTuplesLimitPerWindow() {
+    return tuplesLimitPerWindow;
+  }
+
+  public void setTuplesLimitPerWindow(long tuplesLimitPerWindow) {
+    this.tuplesLimitPerWindow = tuplesLimitPerWindow;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexOperator.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexOperator.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexOperator.java
new file mode 100644
index 0000000..14b357e
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexOperator.java
@@ -0,0 +1,158 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * Apex Operator
+ *
+ */
+@DefaultSerializer(JavaSerializer.class)
+public class ApexOperator extends BaseOperator implements Serializable {
+
+  private static final long serialVersionUID = -6637673741263199198L;
+  public final Processor processor;
+  public int instances = 1; // Default
+
+  public boolean[] usedInputPorts = new boolean[] { false, false, false, 
false, false };
+  public boolean[] usedOutputPorts = new boolean[] { false, false, false, 
false, false };
+
+  public ApexOperator() {
+    processor = null;
+  }
+
+  @SuppressWarnings("serial")
+  @InputPortFieldAnnotation(optional = true)
+  public DefaultInputPortSerializable<ContentEvent> inputPort0 = new 
DefaultInputPortSerializable<ContentEvent>() {
+    @Override
+    public void process(ContentEvent tuple) {
+      processor.process(tuple);
+    }
+  };
+  @SuppressWarnings("serial")
+  @InputPortFieldAnnotation(optional = true)
+  public DefaultInputPortSerializable<ContentEvent> inputPort1 = new 
DefaultInputPortSerializable<ContentEvent>() {
+    @Override
+    public void process(ContentEvent tuple) {
+      processor.process(tuple);
+    }
+  };
+  @SuppressWarnings("serial")
+  @InputPortFieldAnnotation(optional = true)
+  public DefaultInputPortSerializable<ContentEvent> inputPort2 = new 
DefaultInputPortSerializable<ContentEvent>() {
+    @Override
+    public void process(ContentEvent tuple) {
+      processor.process(tuple);
+    }
+  };
+  @SuppressWarnings("serial")
+  @InputPortFieldAnnotation(optional = true)
+  public DefaultInputPortSerializable<ContentEvent> inputPort3 = new 
DefaultInputPortSerializable<ContentEvent>() {
+    @Override
+    public void process(ContentEvent tuple) {
+      processor.process(tuple);
+    }
+  };
+  @SuppressWarnings("serial")
+  @InputPortFieldAnnotation(optional = true)
+  public DefaultInputPortSerializable<ContentEvent> inputPort4 = new 
DefaultInputPortSerializable<ContentEvent>() {
+    @Override
+    public void process(ContentEvent tuple) {
+      processor.process(tuple);
+    }
+  };
+
+  @OutputPortFieldAnnotation(optional = true)
+  public DefaultOutputPortSerializable<ContentEvent> outputPort0 = new 
DefaultOutputPortSerializable<ContentEvent>();
+  @OutputPortFieldAnnotation(optional = true)
+  public DefaultOutputPortSerializable<ContentEvent> outputPort1 = new 
DefaultOutputPortSerializable<ContentEvent>();
+  @OutputPortFieldAnnotation(optional = true)
+  public DefaultOutputPortSerializable<ContentEvent> outputPort2 = new 
DefaultOutputPortSerializable<ContentEvent>();
+  @OutputPortFieldAnnotation(optional = true)
+  public DefaultOutputPortSerializable<ContentEvent> outputPort3 = new 
DefaultOutputPortSerializable<ContentEvent>();
+  @OutputPortFieldAnnotation(optional = true)
+  public DefaultOutputPortSerializable<ContentEvent> outputPort4 = new 
DefaultOutputPortSerializable<ContentEvent>();
+
+  ApexOperator(Processor processor, int parallelismHint) {
+    this.processor = processor;
+    this.instances = parallelismHint;
+  }
+
+  @Override
+  public void setup(OperatorContext context) {
+    processor.onCreate(context.getId());
+  }
+
+  public ApexStream createStream(String id) {
+    ApexStream stream = new ApexStream(id);
+    if (!usedOutputPorts[0]) {
+      stream.outputPort = outputPort0;
+      usedOutputPorts[0] = true;
+    } else if (!usedOutputPorts[1]) {
+      stream.outputPort = outputPort1;
+      usedOutputPorts[1] = true;
+    } else if (!usedOutputPorts[2]) {
+      stream.outputPort = outputPort2;
+      usedOutputPorts[2] = true;
+    } else if (!usedOutputPorts[3]) {
+      stream.outputPort = outputPort3;
+      usedOutputPorts[3] = true;
+    } else if (!usedOutputPorts[4]) {
+      stream.outputPort = outputPort4;
+      usedOutputPorts[4] = true;
+    } else {
+      throw new RuntimeException("Need more input ports for ApexOperator");
+    }
+    return stream;
+  }
+
+  public void addInputStream(ApexStream stream) {
+    if (!usedInputPorts[0]) {
+      stream.inputPort = inputPort0;
+      usedInputPorts[0] = true;
+    } else if (!usedInputPorts[1]) {
+      stream.inputPort = inputPort1;
+      usedInputPorts[1] = true;
+    } else if (!usedInputPorts[2]) {
+      stream.inputPort = inputPort2;
+      usedInputPorts[2] = true;
+    } else if (!usedInputPorts[3]) {
+      stream.inputPort = inputPort3;
+      usedInputPorts[3] = true;
+    } else if (!usedInputPorts[4]) {
+      stream.inputPort = inputPort4;
+      usedInputPorts[4] = true;
+    } else {
+      throw new RuntimeException("Need more input ports for ApexOperator");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexProcessingItem.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexProcessingItem.java
new file mode 100644
index 0000000..c95c24d
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexProcessingItem.java
@@ -0,0 +1,113 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.UUID;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.PartitioningScheme;
+
+public class ApexProcessingItem extends AbstractProcessingItem implements 
ApexTopologyNode {
+
+  private final ApexOperator operator;
+  private DAG dag;
+  private int numStreams;
+  private int numPartitions = 1;
+
+  // Constructor
+  public ApexProcessingItem(Processor processor, int parallelismHint) {
+    this(processor, UUID.randomUUID().toString(), parallelismHint);
+  }
+
+  public ApexProcessingItem() {
+    operator = null;
+  }
+
+  // Constructor
+  public ApexProcessingItem(Processor processor, String friendlyId, int 
parallelismHint) {
+    super(processor, parallelismHint);
+    this.operator = new ApexOperator(processor, parallelismHint);
+    this.setName(friendlyId);
+    this.numPartitions = parallelismHint;
+  }
+
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    ApexStream apexStream = (ApexStream) inputStream;
+    this.operator.addInputStream(apexStream);
+    dag.addStream(apexStream.getStreamId(), apexStream.outputPort, 
apexStream.inputPort);
+
+    // Setup stream codecs here
+    switch (scheme) {
+    case SHUFFLE:
+      dag.setInputPortAttribute(apexStream.inputPort, 
Context.PortContext.STREAM_CODEC,
+          new ApexStreamUtils.RandomStreamCodec<ContentEvent>());
+      break;
+    case BROADCAST:
+      dag.setAttribute(this.operator, Context.OperatorContext.PARTITIONER,
+          new ApexStreamUtils.AllPartitioner<ApexOperator>(numPartitions));
+      dag.setInputPortAttribute(apexStream.inputPort, 
Context.PortContext.STREAM_CODEC,
+          new ApexStreamUtils.JavaSerializationStreamCodec<ContentEvent>());
+      break;
+    case GROUP_BY_KEY:
+      dag.setInputPortAttribute(apexStream.inputPort, 
Context.PortContext.STREAM_CODEC,
+          new ApexStreamUtils.KeyBasedStreamCodec<ContentEvent>());
+      break;
+    default:
+      // Should never occur
+      throw new RuntimeException("Unknown partitioning scheme");
+    }
+
+    if (!dag.getAttributes().contains(Context.OperatorContext.PARTITIONER)) {
+      dag.setAttribute(this.operator, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<>(numPartitions));
+    }
+    return this;
+  }
+
+  @Override
+  public void addToTopology(ApexTopology topology, int parallelismHint) {
+    DAG dag = topology.getDAG();
+    this.dag = dag;
+    this.operator.instances = parallelismHint;
+    String fqcn = this.getName();
+    dag.addOperator(fqcn, this.operator);
+  }
+
+  @Override
+  public ApexStream createStream() {
+    return operator.createStream("Stream_from_" + this.getName() + "_#" + 
numStreams++);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.insert(0, String.format("id: %s, ", this.getName()));
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexStream.java 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexStream.java
new file mode 100644
index 0000000..f70e8d5
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexStream.java
@@ -0,0 +1,57 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import java.io.Serializable;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+
+@DefaultSerializer(JavaSerializer.class)
+public class ApexStream extends AbstractStream implements Serializable {
+
+  private static final long serialVersionUID = -5712513402991550847L;
+
+  private String streamId = "";
+  public DefaultInputPortSerializable<ContentEvent> inputPort;
+  public DefaultOutputPortSerializable<ContentEvent> outputPort;
+
+  public ApexStream(String id) {
+    streamId = id;
+  }
+
+  @Override
+  public void put(ContentEvent contentEvent) {
+    outputPort.emit(contentEvent);
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  @Override
+  public void setBatchSize(int batchsize) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexStreamUtils.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexStreamUtils.java
new file mode 100644
index 0000000..d216d04
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexStreamUtils.java
@@ -0,0 +1,135 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.samoa.core.ContentEvent;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.netlet.util.Slice;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.stram.plan.logical.DefaultKryoStreamCodec;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.Lists;
+
+public class ApexStreamUtils {
+  public static class KeyBasedStreamCodec<T extends ContentEvent> extends 
JavaSerializationStreamCodec<T> {
+    /**
+     * KeyBasedStreamCodec - Distributes tuples to down stream operators such 
that each tuple with same key goes to same
+     * partition
+     */
+    private static final long serialVersionUID = -7144877905889718517L;
+
+    @Override
+    public int getPartition(T t) {
+      return t.getKey().hashCode();
+    }
+  }
+
+  public static class RandomStreamCodec<T extends ContentEvent> extends 
JavaSerializationStreamCodec<T> {
+    /**
+     * RandomStreamCodec - Distributes tuples to down stream partitions 
randomly
+     */
+    private static final long serialVersionUID = -7522462490354605783L;
+    private Random r;
+
+    public RandomStreamCodec() {
+      r = new Random();
+    }
+
+    @Override
+    public int getPartition(T t) {
+      return Math.abs(r.nextInt());
+    }
+  }
+
+  public static class AllPartitioner<T> implements Partitioner<T>, 
Serializable {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 7203316682043269713L;
+    private int newPartitionCount;
+
+    public AllPartitioner(int n) {
+      newPartitionCount = n;
+    }
+
+    @Override
+    public Collection<com.datatorrent.api.Partitioner.Partition<T>> 
definePartitions(
+        Collection<com.datatorrent.api.Partitioner.Partition<T>> partitions,
+        com.datatorrent.api.Partitioner.PartitioningContext context) {
+      //Get a partition
+      DefaultPartition<T> partition = (DefaultPartition<T>) 
partitions.iterator().next();
+      Collection<Partition<T>> newPartitions;
+
+      // first call to define partitions
+      newPartitions = Lists.newArrayList();
+
+      for (int partitionCounter = 0; partitionCounter < newPartitionCount; 
partitionCounter++) {
+        newPartitions.add(new 
DefaultPartition<T>(partition.getPartitionedInstance()));
+      }
+      List<InputPort<?>> inputPortList = context.getInputPorts();
+//      if (inputPortList != null && !inputPortList.isEmpty()) {
+//        DefaultPartition.assignPartitionKeys(newPartitions, 
inputPortList.iterator().next());
+//      }
+
+      return newPartitions;
+    }
+
+    @Override
+    public void partitioned(Map<Integer, 
com.datatorrent.api.Partitioner.Partition<T>> partitions) {
+    }
+
+    public int getNewPartitionCount() {
+      return newPartitionCount;
+    }
+
+    public void setNewPartitionCount(int newPartitionCount) {
+      this.newPartitionCount = newPartitionCount;
+    }
+  }
+
+  public static class JavaSerializationStreamCodec<T extends ContentEvent> 
extends DefaultKryoStreamCodec<T> {
+
+    private static final long serialVersionUID = -183071548840076388L;
+
+    public JavaSerializationStreamCodec() {
+     super();
+     this.kryo.setDefaultSerializer(JavaSerializer.class);
+    }
+
+    @Override
+    public Slice toByteArray(T info) {
+      return super.toByteArray(info);
+    }
+
+    @Override
+    public Object fromByteArray(Slice fragment) {
+      return super.fromByteArray(fragment);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTask.java 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTask.java
new file mode 100644
index 0000000..c5bca98
--- /dev/null
+++ b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTask.java
@@ -0,0 +1,151 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.samoa.apex.topology.impl.ApexStreamUtils.JavaSerializationStreamCodec;
+import org.apache.samoa.core.ContentEvent;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Attribute.AttributeMap;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
+import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class ApexTask implements StreamingApplication {
+
+  LogicalPlan dag;
+  String appName;
+  List<OperatorMeta> visited = Lists.newArrayList();
+  Set<StreamMeta> loopStreams = Sets.newHashSet();
+  Map<String, Integer> operatorNames = Maps.newHashMap();
+
+  public ApexTask(ApexTopology apexTopo) {
+    this.dag = (LogicalPlan) apexTopo.getDAG();
+    appName = apexTopo.getTopologyName(); 
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public void populateDAG(DAG dag, Configuration conf) {
+
+    LogicalPlan dag2 = new LogicalPlan();
+    for (OperatorMeta o : this.dag.getAllOperators()) {
+      dag2.addOperator(o.getName(), o.getOperator());
+    }
+    for (StreamMeta s : this.dag.getAllStreams()) {
+      for (InputPortMeta i : s.getSinks()) {
+        Operator.OutputPort<Object> op = (OutputPort<Object>) 
s.getSource().getPortObject();
+        Operator.InputPort<Object> ip = (InputPort<Object>) i.getPortObject();
+        dag2.addStream(s.getName(), op, ip);
+      }
+    }
+
+    detectLoops(dag2, conf);
+
+    // Reconstruct Dag
+    for (OperatorMeta o : this.dag.getAllOperators()) {
+      dag.addOperator(o.getName(), o.getOperator());
+      for (Entry<Attribute<?>, Object> attr : o.getAttributes().entrySet()) {
+        dag.setAttribute(o.getOperator(), (Attribute) attr.getKey(), 
attr.getValue());
+      }
+      for(InputPortMeta meta: o.getInputStreams().keySet()) {
+        AttributeMap map = meta.getAttributes();
+        for(Entry<Attribute<?>, Object> entry: map.entrySet()) {
+          dag.setInputPortAttribute(meta.getPortObject(), (Attribute) 
entry.getKey(), entry.getValue());
+        }
+      }
+      dag.setAttribute(Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 300);
+    }
+    for (StreamMeta s : this.dag.getAllStreams()) {
+      if (loopStreams.contains(s)) {
+        // Add delay Operator
+        DelayOperatorSerializable<ContentEvent> d = dag.addOperator("Delay_" + 
s.getName(),
+            new DelayOperatorSerializable<ContentEvent>());
+        dag.addStream("Delay" + s.getName() + "toDelay",
+            (DefaultOutputPort<ContentEvent>) s.getSource().getPortObject(), 
d.input);
+        dag.addStream("Delay" + s.getName() + "fromDelay", d.output,
+            (DefaultInputPort<ContentEvent>) 
s.getSinks().get(0).getPortObject());
+        dag.setInputPortAttribute(d.input, Context.PortContext.STREAM_CODEC, 
new JavaSerializationStreamCodec<ContentEvent>());
+        dag.setInputPortAttribute(s.getSinks().get(0).getPortObject(), 
Context.PortContext.STREAM_CODEC, new 
JavaSerializationStreamCodec<ContentEvent>());
+        continue;
+      }
+      for (InputPortMeta i : s.getSinks()) {
+        DefaultOutputPort<Object> op = (DefaultOutputPort<Object>) 
s.getSource().getPortObject();
+        DefaultInputPort<Object> ip = (DefaultInputPort<Object>) 
i.getPortObject();
+        Preconditions.checkArgument(op != null && ip != null);
+        dag.addStream(s.getName(), op, ip);
+      }
+    }
+    dag.setAttribute(Context.DAGContext.APPLICATION_NAME, appName);
+  }
+
+  public void detectLoops(DAG dag, Configuration conf) {
+    List<OperatorMeta> inputOperators = Lists.newArrayList();
+    for (OperatorMeta om : this.dag.getAllOperators()) {
+      if (om.getOperator() instanceof InputOperator) {
+        inputOperators.add(om);
+      }
+    }
+
+    for (OperatorMeta o : inputOperators) {
+      visited.clear();
+      List<OperatorMeta> visited = Lists.newArrayList();
+      dfs(o, visited);
+    }
+  }
+
+  public void dfs(OperatorMeta o, List<OperatorMeta> visited) {
+    visited.add(o);
+
+    for (Entry<OutputPortMeta, StreamMeta> opm : 
o.getOutputStreams().entrySet()) {
+      // Samoa won't allow one output port to multiple input port kind of 
streams
+      OperatorMeta downStreamOp = 
opm.getValue().getSinks().get(0).getOperatorWrapper();
+      if (visited.contains(downStreamOp)) {
+        loopStreams.add(opm.getValue());
+      } else {
+        List<OperatorMeta> v2 = Lists.newArrayList();
+        v2.addAll(visited);
+        dfs(downStreamOp, v2);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTopology.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTopology.java
new file mode 100644
index 0000000..2316011
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTopology.java
@@ -0,0 +1,71 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.AbstractTopology;
+import org.apache.samoa.topology.Stream;
+
+public class ApexTopology extends AbstractTopology {
+
+  private DAG dag;
+
+  public ApexTopology(DAG dag, String name) {
+    super(name);
+    this.dag = dag;
+  }
+
+  protected ApexTopology(String name) {
+    super(name);
+    dag = new LogicalPlan();
+  }
+
+  @Override
+  public void addEntranceProcessingItem(EntranceProcessingItem epi) {
+    ApexTopologyNode apexNode = (ApexTopologyNode) epi;
+    apexNode.addToTopology(this, 1);
+  }
+
+  @Override
+  public void addProcessingItem(IProcessingItem procItem) {
+    addProcessingItem(procItem, 1);
+  }
+
+  @Override
+  public void addProcessingItem(IProcessingItem procItem, int parallelismHint) 
{
+    ApexTopologyNode apexNode = (ApexTopologyNode) procItem;
+    apexNode.addToTopology(this, parallelismHint);
+    super.addProcessingItem(procItem, parallelismHint);
+  }
+
+  @Override
+  public void addStream(Stream stream) {
+    super.addStream(stream);
+  }
+
+  public DAG getDAG() {
+    return dag;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTopologyNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTopologyNode.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTopologyNode.java
new file mode 100644
index 0000000..63103ad
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/ApexTopologyNode.java
@@ -0,0 +1,38 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+interface ApexTopologyNode {
+
+  /**
+   * Add this node to given topology
+   * @param topology
+   * @param parallelismHint
+   */
+  void addToTopology(ApexTopology topology, int parallelismHint);
+
+  /**
+   * Create a stream from this node
+   * @return
+   */
+  ApexStream createStream();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DefaultInputPortSerializable.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DefaultInputPortSerializable.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DefaultInputPortSerializable.java
new file mode 100644
index 0000000..5d3f0ee
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DefaultInputPortSerializable.java
@@ -0,0 +1,31 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+
+import com.datatorrent.api.DefaultInputPort;
+
+public abstract class DefaultInputPortSerializable<T> extends 
DefaultInputPort<T> implements Serializable {
+
+  private static final long serialVersionUID = -8903550075465894319L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DefaultOutputPortSerializable.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DefaultOutputPortSerializable.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DefaultOutputPortSerializable.java
new file mode 100644
index 0000000..f8a4281
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DefaultOutputPortSerializable.java
@@ -0,0 +1,31 @@
+package org.apache.samoa.apex.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+
+import com.datatorrent.api.DefaultOutputPort;
+
+public class DefaultOutputPortSerializable<T> extends DefaultOutputPort<T> 
implements Serializable {
+
+  private static final long serialVersionUID = 8661276624406533785L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DelayOperatorSerializable.java
----------------------------------------------------------------------
diff --git 
a/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DelayOperatorSerializable.java
 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DelayOperatorSerializable.java
new file mode 100644
index 0000000..b2bb184
--- /dev/null
+++ 
b/samoa-apex/src/main/java/org/apache/samoa/apex/topology/impl/DelayOperatorSerializable.java
@@ -0,0 +1,60 @@
+package org.apache.samoa.apex.topology.impl;
+
+import java.io.Serializable;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.datatorrent.common.util.DefaultDelayOperator;
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+@DefaultSerializer(JavaSerializer.class)
+public class DelayOperatorSerializable<T> extends DefaultDelayOperator<T> 
implements Serializable {
+
+  private static final long serialVersionUID = -2972537213450678368L;
+
+  public final DefaultInputPortSerializable<T> input = new 
DefaultInputPortSerializable<T>() {
+
+  private static final long serialVersionUID = 6830919916828325819L;
+
+  @Override
+  public void process(T tuple) {
+      processTuple(tuple);
+    }
+  };
+  
+  public final DefaultOutputPortSerializable<T> output = new 
DefaultOutputPortSerializable<>();
+
+  protected void processTuple(T tuple)
+  {
+    lastWindowTuples.add(tuple);
+    output.emit(tuple);
+  }
+
+  @Override
+  public void firstWindow()
+  {
+    for (T tuple : lastWindowTuples) {
+      output.emit(tuple);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/samoa-apex/src/main/resources/log4j.properties 
b/samoa-apex/src/main/resources/log4j.properties
new file mode 100644
index 0000000..9189640
--- /dev/null
+++ b/samoa-apex/src/main/resources/log4j.properties
@@ -0,0 +1,42 @@
+###
+# #%L
+# SAMOA
+# %%
+# Copyright (C) 2014 - 2016 Apache Software Foundation
+# %%
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# #L%
+###
+log4j.rootLogger=INFO,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M 
- %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - 
%m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - 
%m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=debug
+log4j.logger.org.apache.samoa=debug
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=info
+log4j.logger.org.apache.apex=info

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/test/java/org/apache/samoa/apex/AlgosTestApex.java
----------------------------------------------------------------------
diff --git a/samoa-apex/src/test/java/org/apache/samoa/apex/AlgosTestApex.java 
b/samoa-apex/src/test/java/org/apache/samoa/apex/AlgosTestApex.java
new file mode 100644
index 0000000..80d9449
--- /dev/null
+++ b/samoa-apex/src/test/java/org/apache/samoa/apex/AlgosTestApex.java
@@ -0,0 +1,68 @@
+package org.apache.samoa.apex;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2016 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.TestParams;
+import org.apache.samoa.TestUtils;
+import org.apache.samoa.apex.LocalApexDoTask;
+import org.junit.Test;
+
+public class AlgosTestApex {
+
+  @Test(timeout = 600000)
+  public void testVHTWithApex() throws Exception {
+
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(55f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
+        .resultFilePollTimeout(30)
+        .prePollWait(15)
+        .taskClassName(LocalApexDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
+
+  }
+
+  @Test(timeout = 120000)
+  public void testNaiveBayesWithApex() throws Exception {
+    TestParams baggingConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(180_000)
+        .classifiedInstances(190_000)
+        .classificationsCorrect(60f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE)
+        .resultFilePollTimeout(40)
+        .prePollWait(20)
+        .taskClassName(LocalApexDoTask.class.getName())
+        .build();
+    TestUtils.test(baggingConfig);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-apex/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/samoa-apex/src/test/resources/log4j.properties 
b/samoa-apex/src/test/resources/log4j.properties
new file mode 100644
index 0000000..9189640
--- /dev/null
+++ b/samoa-apex/src/test/resources/log4j.properties
@@ -0,0 +1,42 @@
+###
+# #%L
+# SAMOA
+# %%
+# Copyright (C) 2014 - 2016 Apache Software Foundation
+# %%
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# #L%
+###
+log4j.rootLogger=INFO,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M 
- %m%n
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - 
%m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - 
%m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=debug
+log4j.logger.org.apache.samoa=debug
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=info
+log4j.logger.org.apache.apex=info

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
index aff25bf..967682a 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
@@ -22,6 +22,7 @@ package org.apache.samoa.learners.classifiers.trees;
 
 import static org.apache.samoa.moa.core.Utils.maxIndex;
 
+import java.io.Serializable;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -620,11 +621,11 @@ final class ModelAggregatorProcessor implements Processor 
{
    * @author Arinto Murdopo
    * 
    */
-  static class SplittingNodeInfo {
+  static class SplittingNodeInfo implements Serializable {
 
     private final ActiveLearningNode activeLearningNode;
     private final FoundNode foundNode;
-    private final ScheduledFuture<?> scheduledFuture;
+    private final transient ScheduledFuture<?> scheduledFuture;
 
     SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode 
foundNode, ScheduledFuture<?> scheduledFuture) {
       this.activeLearningNode = activeLearningNode;

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1c858e1e/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
index 070021e..9ee831e 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
@@ -46,7 +46,7 @@ public class ArffFileStream extends FileStream {
       -1, -1, Integer.MAX_VALUE);*/
 
   protected InstanceExample lastInstanceRead;
-  private BufferedReader fileReader;
+  private transient BufferedReader fileReader;
 
   @Override
   public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {

Reply via email to