Repository: incubator-samoa
Updated Branches:
  refs/heads/master 1b3529983 -> 64ef7a921


SAMOA-16: Add an adapter for Apache Flink-Streaming (senorcarbone)
Fix #11


Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/64ef7a92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/64ef7a92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/64ef7a92

Branch: refs/heads/master
Commit: 64ef7a921833b020ea9e556579f0d933e07b0a63
Parents: 1b35299
Author: Gianmarco De Francisci Morales <[email protected]>
Authored: Tue May 26 11:30:35 2015 +0300
Committer: Gianmarco De Francisci Morales <[email protected]>
Committed: Tue May 26 11:30:35 2015 +0300

----------------------------------------------------------------------
 .gitignore                                      |   3 +-
 bin/samoa                                       |  62 ++++-
 bin/samoa-flink.properties                      |  35 +++
 pom.xml                                         |  12 +
 samoa-flink/pom.xml                             | 134 ++++++++++
 .../java/com/yahoo/labs/flink/FlinkDoTask.java  |  87 +++++++
 .../labs/flink/helpers/CircleDetection.java     |  99 ++++++++
 .../com/yahoo/labs/flink/helpers/Utils.java     |  69 ++++++
 .../flink/topology/impl/FlinkComponent.java     |  68 +++++
 .../topology/impl/FlinkComponentFactory.java    |  66 +++++
 .../impl/FlinkEntranceProcessingItem.java       | 101 ++++++++
 .../topology/impl/FlinkProcessingItem.java      | 248 +++++++++++++++++++
 .../labs/flink/topology/impl/FlinkStream.java   |  94 +++++++
 .../labs/flink/topology/impl/FlinkTopology.java | 185 ++++++++++++++
 .../labs/flink/topology/impl/SamoaType.java     |  42 ++++
 15 files changed, 1303 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 3cf0208..294c718 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,4 +11,5 @@ target/
 
 #intellij
 .idea/
-.iml
+*.iml
+*.iws

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/bin/samoa
----------------------------------------------------------------------
diff --git a/bin/samoa b/bin/samoa
index b34f65b..0ace74b 100755
--- a/bin/samoa
+++ b/bin/samoa
@@ -4,7 +4,7 @@
 # #%L
 # SAMOA
 # %%
-# Copyright (C) 2013 Yahoo! Inc.
+# Copyright (C) 2014 - 2015 Apache Software Foundation
 # %%
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -281,6 +281,66 @@ elif [ $PLATFORM = 'SAMZA' ]; then
                --kryo_register=$BASE_DIR/$KRYO_REGISTER_FILE 
--pi_per_container=$PI_PER_CONTAINER \
               --samoa_hdfs_dir=$HDFS_SAMOA_HOME
 
+elif [ $PLATFORM = 'FLINK' ]; then
+
+        echo "Deploying to $PLATFORM"
+        if [ -z $FLINK_HOME ];then
+            echo "FLINK_HOME is not set!"
+            echo "Please set FLINK_HOME to point to your Flink installation"
+            exit -1
+        fi
+
+        if [ ! -f $2 ];then
+            echo "$2 does not exist!"
+            echo "Please use a valid jar file for Flink execution"
+            exit -1
+        fi
+
+        FLINK_EXEC="$FLINK_HOME/bin/flink"
+
+        SAMOA_FLINK_PROPERTIES="samoa-flink.properties"
+        MODE_OPTION="samoa.flink.mode"
+#        NUM_WORKER_OPTION="samoa.flink.numWorker"
+
+        VALUE=""
+        getvalue()
+        {
+            VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_FLINK_PROPERTIES | grep "$1" 
| tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
+        }
+        
+#        getvalue "$NUM_WORKER_OPTION"
+#        NUM_WORKER="$VALUE"
+
+        getvalue "$MODE_OPTION"
+        MODE_ARG="$VALUE"
+
+        COMPLETE_ARG=""
+        COUNTER=0
+        for var in "$@"
+        do
+            COUNTER=`expr $COUNTER + 1`
+            if [ $COUNTER -gt 2 ];then
+                COMPLETE_ARG="$COMPLETE_ARG $var"
+            fi
+        done
+
+        DEPLOYABLE=$JAR_PATH
+        echo "$DEPLOYABLE"
+        if [ "$MODE_ARG" = "cluster" ]; then
+                               FLINK_MASTER_OPTION="samoa.flink.flinkMaster"
+                               PORT_OPTION="samoa.flink.port"
+
+                               getvalue "$FLINK_MASTER_OPTION"
+                               FLINK_MASTER_OPTION="$VALUE"
+
+                               getvalue "$PORT_OPTION"
+                               PORT_OPTION="$VALUE"
+                $FLINK_EXEC run -m $FLINK_MASTER_OPTION:$PORT_OPTION 
$DEPLOYABLE  $COMPLETE_ARG
+
+        elif [ "$MODE_ARG" = "local" ]; then
+                $FLINK_EXEC run $DEPLOYABLE $COMPLETE_ARG
+        fi                                                    
+
 elif [ $PLATFORM = 'THREADS' ]; then
        
        echo "Deploying to LOCAL with MULTITHREADING."

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/bin/samoa-flink.properties
----------------------------------------------------------------------
diff --git a/bin/samoa-flink.properties b/bin/samoa-flink.properties
new file mode 100644
index 0000000..b9f56c0
--- /dev/null
+++ b/bin/samoa-flink.properties
@@ -0,0 +1,35 @@
+###
+# #%L
+# SAMOA
+# %%
+# Copyright (C) 2014 - 2015 Apache Software Foundation
+# %%
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# #L%
+###
+
+# SAMOA Flink properties file
+# This file contains specific configurations for SAMOA deployment in Flink 
platform
+
+# samoa.flink.mode corresponds to the execution mode of a Task in Flink
+# possible values:
+# 1. local   - to run the task in local StreamingEnvironment
+# 2. cluster - to run the task in the specified cluster
+samoa.flink.mode=cluster
+
+#in case samoa.flink.mode equals "cluster", then the user has to set up also 
the following parameters:
+# @samoa.flink.flinkMaster: the IP address of the cluster
+# @samoa.flink.port : the port
+samoa.flink.flinkMaster=127.0.0.1
+samoa.flink.port=6123
+

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 622d952..819a13c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,15 @@
             </modules>
         </profile>
         <profile>
+            <id>flink</id>
+            <modules>
+                <module>samoa-instances</module>
+                <module>samoa-api</module>
+                <module>samoa-flink</module>
+                <module>samoa-test</module>
+            </modules>
+        </profile>
+        <profile>
             <id>samza</id>
             <modules>
                 <module>samoa-instances</module>
@@ -104,6 +113,7 @@
                 <module>samoa-local</module>
                 <module>samoa-threads</module>
                 <module>samoa-storm</module>
+                <module>samoa-flink</module>
                 <module>samoa-s4</module>
                 <module>samoa-samza</module>
                 <module>samoa-test</module>
@@ -130,6 +140,7 @@
         <miniball.version>1.0.3</miniball.version>
         <s4.version>0.6.0-incubating</s4.version>
         <samza.version>0.7.0</samza.version>
+        <flink.version>0.9.0-milestone-1</flink.version>
         <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
         <slf4j-simple.version>1.7.5</slf4j-simple.version>
         <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>
@@ -211,6 +222,7 @@
                         <root>samoa-local</root>
                         <root>samoa-storm</root>
                         <root>samoa-s4</root>
+                        <root>samoa-flink</root>
                         <root>samoa-samza</root>
                         <root>bin</root>
                     </roots>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
new file mode 100644
index 0000000..f00fe3c
--- /dev/null
+++ b/samoa-flink/pom.xml
@@ -0,0 +1,134 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+            <kryo.version>2.24.0</kryo.version>
+    </properties>
+      <repositories>
+                <repository>
+                        <id>apache.snapshots</id>
+                        <name>Apache Development Snapshot Repository</name>
+                        
<url>https://repository.apache.org/content/repositories/snapshots/</url>
+                        <releases>
+                                <enabled>false</enabled>
+                        </releases>
+                        <snapshots>
+                                <enabled>true</enabled>
+                        </snapshots>
+                </repository>
+        </repositories>
+    <name>samoa-flink</name>
+    <description>Flink engine for SAMOA</description>
+
+    <artifactId>samoa-flink</artifactId>
+    <parent>
+        <groupId>com.yahoo.labs.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    
+    
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-api</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.esotericsoftware.kryo</groupId>
+                    <artifactId>kryo</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j-log4j12.version}</version>
+        </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-core</artifactId>
+                       <version>${flink.version}</version>
+            <!--<scope>provided</scope>-->
+               </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>${kryo.version}</version>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <!-- Flink assembly -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven-assembly-plugin.version}</version>
+                <configuration>
+                    <finalName>SAMOA-Flink-${project.version}</finalName>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                    <outputDirectory>../target</outputDirectory>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifestEntries>
+                            
<!--<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>-->
+                            
<Bundle-Description>${project.description}</Bundle-Description>
+                            
<Implementation-Version>${project.version}</Implementation-Version>
+                            <Implementation-Vendor>Yahoo 
Labs</Implementation-Vendor>
+                            
<Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+                        </manifestEntries>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            
<mainClass>com.yahoo.labs.flink.FlinkDoTask</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for 
inheritance merges -->
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${maven-surefire-plugin.version}</version>
+                <configuration>
+                    <argLine>-Xmx1G</argLine>
+                    <redirectTestOutputToFile>false</redirectTestOutputToFile>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
new file mode 100644
index 0000000..6069de9
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
@@ -0,0 +1,87 @@
+package com.yahoo.labs.flink;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.ClassOption;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.flink.topology.impl.FlinkComponentFactory;
+import com.yahoo.labs.flink.topology.impl.FlinkProcessingItem;
+import com.yahoo.labs.flink.topology.impl.FlinkStream;
+import com.yahoo.labs.flink.topology.impl.FlinkTopology;
+import com.yahoo.labs.samoa.tasks.Task;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+/**
+ * Main class to run a SAMOA on Apache Flink
+ */
+public class FlinkDoTask {
+
+       private static final Logger logger = 
LoggerFactory.getLogger(FlinkDoTask.class);
+
+
+       public static void main(String[] args) throws Exception {
+               List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
+
+               args = tmpArgs.toArray(new String[0]);
+
+               // Init Task
+               StringBuilder cliString = new StringBuilder();
+               for (int i = 0; i < args.length; i++) {
+                       cliString.append(" ").append(args[i]);
+               }
+               logger.debug("Command line string = {}", cliString.toString());
+               System.out.println("Command line string = " + 
cliString.toString());
+
+               Task task;
+               try {
+                       task = 
ClassOption.cliStringToObject(cliString.toString(), Task.class, null);
+                       logger.debug("Successfully instantiating {}", 
task.getClass().getCanonicalName());
+               } catch (Exception e) {
+                       logger.error("Failed to initialize the task: ", e);
+                       System.out.println("Failed to initialize the task: " + 
e);
+                       return;
+               }
+               
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               task.setFactory(new FlinkComponentFactory(env));
+               task.init();
+               
+               logger.debug("Building Flink topology...");
+               ((FlinkTopology) task.getTopology()).build();
+               
+               logger.debug("Submitting the job...");
+               env.execute();
+
+       }
+
+
+       
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
new file mode 100644
index 0000000..a832ee9
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
@@ -0,0 +1,99 @@
+package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * This class contains all logic needed in order to mark circles in job graphs 
explicitly such as 
+ * in the case of Apache Flink. A circle is defined as a list of node ids 
ordered in topological 
+ * (DFS) order.
+ * 
+ */
+public class CircleDetection {
+       private int[] index;
+       private int[] lowLink;
+       private int counter;
+       private Stack<Integer> stack;
+       private List<List<Integer>> scc;
+       List<Integer>[] graph;
+
+
+       public CircleDetection() {
+               stack = new Stack<Integer>();
+               scc = new ArrayList<>();
+       }
+
+       public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) {
+               graph = adjacencyList;
+               index = new int[adjacencyList.length];
+               lowLink = new int[adjacencyList.length];
+               counter = 0;
+
+               //initialize index and lowLink as "undefined"(=-1)
+               for (int j = 0; j < graph.length; j++) {
+                       index[j] = -1;
+                       lowLink[j] = -1;
+               }
+               for (int v = 0; v < graph.length; v++) {
+                       if (index[v] == -1) { //undefined.
+                               findSCC(v);
+                       }
+               }
+               return scc;
+       }
+
+       private void findSCC(int node) {
+               index[node] = counter;
+               lowLink[node] = counter;
+               counter++;
+               stack.push(node);
+
+               for (int neighbor : graph[node]) {
+                       if (index[neighbor] == -1) {
+                               findSCC(neighbor);
+                               lowLink[node] = Math.min(lowLink[node], 
lowLink[neighbor]);
+                       } else if (stack.contains(neighbor)) { //if neighbor 
has been already visited
+                               lowLink[node] = Math.min(lowLink[node], 
index[neighbor]);
+                               List<Integer> sccComponent = new 
ArrayList<Integer>();
+                               int w;
+                               do {
+                                       w = stack.pop();
+                                       sccComponent.add(w);
+                               } while (neighbor != w);
+                               //add neighbor again, just in case it is a 
member of another circle 
+                               stack.add(neighbor); 
+                               scc.add(sccComponent);
+                       }
+
+               }
+               if (lowLink[node] == index[node]) {
+                       int w;
+                       do {
+                               w = stack.pop();
+                       } while (node != w);
+               }
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
new file mode 100644
index 0000000..fe1b960
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
@@ -0,0 +1,69 @@
+package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.yahoo.labs.flink.topology.impl.SamoaType;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+
+public class Utils {
+
+       public static TypeInformation<SamoaType> tempTypeInfo = new 
TupleTypeInfo(SamoaType.class, STRING_TYPE_INFO, 
TypeExtractor.getForClass(ContentEvent.class), STRING_TYPE_INFO);
+
+       public static DataStream subscribe(DataStream<SamoaType> stream, 
PartitioningScheme partitioning) {
+               switch (partitioning) {
+                       case BROADCAST:
+                               return stream.broadcast();
+                       case GROUP_BY_KEY:
+                               return stream.groupBy(new 
KeySelector<SamoaType, String>() {
+                                       @Override
+                                       public String getKey(SamoaType 
samoaType) throws Exception {
+                                               return samoaType.f0;
+                                       }
+                               });
+                       case SHUFFLE:
+                       default:
+                               return stream.shuffle();
+               }
+       }
+
+       public static FilterFunction<SamoaType> getFilter(final String 
streamID) {
+               return new FilterFunction<SamoaType>() {
+                       @Override
+                       public boolean filter(SamoaType o) throws Exception {
+                               return o.f2.equals(streamID);
+                       }
+               };
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
new file mode 100644
index 0000000..70a7838
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
@@ -0,0 +1,68 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Common interface of FlinkEntranceProcessingItem and FlinkProcessingItem
+ */
+public interface FlinkComponent {
+
+       /**
+        * An initiation of the node. It should create the right invokables and 
apply the appropriate
+        * stream transformations
+        */
+       public void initialise();
+
+       /**
+        * This check is needed in order to determine whether all requirements 
for a Flink Component 
+        * (DataStream) are satisfied in order to initialise it. This is 
necessary in this integration
+        * since Flink Streaming applies eager datastream generation based on 
transformations.
+        * 
+        * @return 
+        */
+       public boolean canBeInitialised();
+
+       /**
+        * 
+        * @return
+        */
+       public boolean isInitialised();
+
+       /**
+        * The wrapped Flink DataStream generated by this Flink component. Mind 
that the component 
+        * should first be initialised in order to have a generated DataStream
+        * 
+        * @return
+        */
+       public DataStream<SamoaType> getOutStream();
+
+       /**
+        * A unique component id
+        * 
+        * @return
+        */
+       public int getComponentId();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
new file mode 100644
index 0000000..fca0c1a
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
@@ -0,0 +1,66 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.*;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * An implementation of SAMOA's ComponentFactory for Apache Flink
+ */
+public class FlinkComponentFactory implements ComponentFactory {
+
+       private StreamExecutionEnvironment env;
+
+       public FlinkComponentFactory(StreamExecutionEnvironment env) {
+               this.env = env;
+       }
+
+       @Override
+       public ProcessingItem createPi(Processor processor) {
+               return new FlinkProcessingItem(env, processor);
+       }
+
+       @Override
+       public ProcessingItem createPi(Processor processor, int parallelism) {
+               return new FlinkProcessingItem(env, processor, parallelism);
+       }
+
+       @Override
+       public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+               return new FlinkEntranceProcessingItem(env, entranceProcessor);
+       }
+
+       @Override
+       public Stream createStream(IProcessingItem sourcePi) {
+               if (sourcePi instanceof FlinkProcessingItem)
+                       return ((FlinkProcessingItem) sourcePi).createStream();
+               else return new FlinkStream((FlinkComponent) sourcePi);
+       }
+
+       @Override
+       public Topology createTopology(String topologyName) {
+               return new FlinkTopology(topologyName, env);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
new file mode 100644
index 0000000..5dca509
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
@@ -0,0 +1,101 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem
+               implements FlinkComponent, Serializable {
+
+       private transient StreamExecutionEnvironment env;
+       private transient DataStream outStream;
+
+
+       public FlinkEntranceProcessingItem(StreamExecutionEnvironment env, 
EntranceProcessor proc) {
+               super(proc);
+               this.env = env;
+       }
+
+       @Override
+       public void initialise() {
+               final EntranceProcessor proc = getProcessor();
+               final String streamId = getOutputStream().getStreamId();
+               final int compID = getComponentId();
+
+               
+               outStream = env.addSource(new RichSourceFunction<SamoaType>() {
+                       volatile boolean canceled;
+                       EntranceProcessor entrProc = proc;
+                       String id = streamId;
+
+                       @Override
+                       public void open(Configuration parameters) throws 
Exception {
+                               super.open(parameters);
+                               entrProc.onCreate(compID);
+                       }
+
+                       @Override
+                       public void run(Collector<SamoaType> collector) throws 
Exception {
+                               while (!canceled && entrProc.hasNext()) {
+                                       
collector.collect(SamoaType.of(entrProc.nextEvent(), id));
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               canceled = true;
+                       }
+               },Utils.tempTypeInfo);
+
+               ((FlinkStream) getOutputStream()).initialise();
+       }
+
+
+       @Override
+       public boolean canBeInitialised() {
+               return true;
+       }
+
+       @Override
+       public boolean isInitialised() {
+               return outStream != null;
+       }
+
+       @Override
+       public int getComponentId() {
+               return -1; // dummy number shows that it comes from an Entrance 
PI
+       }
+
+       @Override
+       public DataStream getOutStream() {
+               return outStream;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
new file mode 100644
index 0000000..f92182e
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
@@ -0,0 +1,248 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> 
implements ProcessingItem, FlinkComponent, Serializable {
+
+       private static final Logger logger = 
LoggerFactory.getLogger(FlinkProcessingItem.class);
+       public static final int MAX_WAIT_TIME_MILLIS = 10000;
+
+       private final Processor processor;
+       private final transient StreamExecutionEnvironment env;
+       private final SamoaDelegateFunction fun;
+       private transient DataStream<SamoaType> inStream;
+       private transient DataStream<SamoaType> outStream;
+       private transient List<FlinkStream> outputStreams = 
Lists.newArrayList();
+       private transient List<Tuple3<FlinkStream, PartitioningScheme, 
Integer>> inputStreams = Lists.newArrayList();
+       private int parallelism;
+       private static int numberOfPIs = 0;
+       private int piID;
+       private List<Integer> circleId; //check if we can refactor this
+       private boolean onIteration;
+       //private int circleId; //check if we can refactor this
+
+       public FlinkProcessingItem(StreamExecutionEnvironment env, Processor 
proc) {
+               this(env, proc, 1);
+       }
+
+       public FlinkProcessingItem(StreamExecutionEnvironment env, Processor 
proc, int parallelism) {
+               this(env, new SamoaDelegateFunction(proc), proc, parallelism);
+       }
+
+       public FlinkProcessingItem(StreamExecutionEnvironment env, 
SamoaDelegateFunction fun, Processor proc, int parallelism) {
+               super(fun);
+               this.env = env;
+               this.fun = fun;
+               this.processor = proc;
+               this.parallelism = parallelism;
+               this.piID = numberOfPIs++;
+               this.circleId = new ArrayList<Integer>() {
+               }; // if size equals 0, then it is part of no circle
+       }
+
+       public Stream createStream() {
+               FlinkStream generatedStream = new FlinkStream(this);
+               outputStreams.add(generatedStream);
+               return generatedStream;
+       }
+
+       public void putToStream(ContentEvent data, Stream targetStream) {
+               collector.collect(SamoaType.of(data, 
targetStream.getStreamId()));
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.processor.onCreate(getComponentId());
+       }
+
+       @Override
+       public void initialise() {
+               for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : inputStreams) {
+                       if (inputStream.f0.isInitialised()) { //if input stream 
is initialised
+                               try {
+                                       DataStream toBeMerged = 
Utils.subscribe(inputStream.f0.getOutStream(), inputStream.f1);
+                                       if (inStream == null) {
+                                               inStream = toBeMerged;
+                                       } else {
+                                               inStream = 
inStream.merge(toBeMerged);
+                                       }
+                               } catch (RuntimeException e) {
+                                       e.printStackTrace();
+                                       System.exit(1);
+                               }
+                       }
+               }
+
+               if (onIteration) {
+                       inStream = inStream.iterate(MAX_WAIT_TIME_MILLIS);
+               }
+               outStream = inStream.transform("samoaProcessor", 
Utils.tempTypeInfo, this).setParallelism(parallelism);
+       }
+
+       public void initialiseStreams() {
+               for (FlinkStream stream : this.getOutputStreams()) {
+                       stream.initialise();
+               }
+       }
+
+       @Override
+       public boolean canBeInitialised() {
+               for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : inputStreams) {
+                       if (!inputStream.f0.isInitialised()) return false;
+               }
+               return true;
+       }
+
+       @Override
+       public boolean isInitialised() {
+               return outStream != null;
+       }
+
+       @Override
+       public Processor getProcessor() {
+               return processor;
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       SamoaType t = nextObject;
+                       fun.processEvent(t.f1);
+               }
+       }
+
+       @Override
+       public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.SHUFFLE, ((FlinkStream) inputStream).getSourcePiId()));
+               return this;
+       }
+
+       @Override
+       public ProcessingItem connectInputKeyStream(Stream inputStream) {
+               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.GROUP_BY_KEY, ((FlinkStream) inputStream).getSourcePiId()));
+               return this;
+       }
+
+       @Override
+       public ProcessingItem connectInputAllStream(Stream inputStream) {
+               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.BROADCAST, ((FlinkStream) inputStream).getSourcePiId()));
+               return this;
+       }
+
+       @Override
+       public int getParallelism() {
+               return parallelism;
+       }
+
+       public void setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+       }
+
+       public List<FlinkStream> getOutputStreams() {
+               return outputStreams;
+       }
+
+       public DataStream<SamoaType> getOutStream() {
+               return this.outStream;
+       }
+
+       public void setOutStream(DataStream outStream) {
+               this.outStream = outStream;
+       }
+
+       @Override
+       public int getComponentId() {
+               return piID;
+       }
+
+       public boolean isPartOfCircle() {
+               return this.circleId.size() > 0;
+       }
+
+       public List<Integer> getCircleIds() {
+               return circleId;
+       }
+
+       public void addPItoLoop(int piId) {
+               this.circleId.add(piId);
+       }
+
+       public DataStream<SamoaType> getInStream() {
+               return inStream;
+       }
+
+       public List<Tuple3<FlinkStream, PartitioningScheme, Integer>> 
getInputStreams() {
+               return inputStreams;
+       }
+
+       public void setOnIteration(boolean onIteration) {
+               this.onIteration = onIteration;
+       }
+
+       public boolean isOnIteration() {
+               return onIteration;
+       }
+
+       static class SamoaDelegateFunction implements Function, Serializable {
+               private final Processor proc;
+
+               SamoaDelegateFunction(Processor proc) {
+                       this.proc = proc;
+               }
+
+               public void processEvent(ContentEvent event) {
+                       proc.process(event);
+               }
+       }
+
+       public FlinkStream getInputStreamBySourceID(int sourceID) {
+               for (Tuple3<FlinkStream, PartitioningScheme, Integer> fstreams 
: inputStreams) {
+                       if (fstreams.f2 == sourceID) {
+                               return fstreams.f0;
+                       }
+               }
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
new file mode 100644
index 0000000..c5cb0ed
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
@@ -0,0 +1,94 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.AbstractStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.io.Serializable;
+
+
+/**
+ * A stream for SAMOA based on Apache Flink's DataStream
+ */
+public class FlinkStream extends AbstractStream implements FlinkComponent, 
Serializable {
+
+       private static int outputCounter = 0;
+       private FlinkComponent procItem;
+       private transient DataStream<SamoaType> dataStream;
+       private int sourcePiId;
+       private String flinkStreamId;
+
+       public FlinkStream(FlinkComponent sourcePi) {
+               this.procItem = sourcePi;
+               this.sourcePiId = sourcePi.getComponentId();
+               setStreamId("stream-" + Integer.toString(outputCounter));
+               flinkStreamId = "stream-" + Integer.toString(outputCounter);
+               outputCounter++;
+       }
+
+       @Override
+       public void initialise() {
+               if (procItem instanceof FlinkProcessingItem) {
+                       dataStream = 
procItem.getOutStream().filter(Utils.getFilter(getStreamId()))
+                       .setParallelism(((FlinkProcessingItem) 
procItem).getParallelism());
+               } else
+                       dataStream = procItem.getOutStream();
+       }
+
+       @Override
+       public boolean canBeInitialised() {
+               return procItem.isInitialised();
+       }
+
+       @Override
+       public boolean isInitialised() {
+               return dataStream != null;
+       }
+
+       @Override
+       public DataStream getOutStream() {
+               return dataStream;
+       }
+
+       @Override
+       public void put(ContentEvent event) {
+               ((FlinkProcessingItem) procItem).putToStream(event, this);
+       }
+
+       @Override
+       public int getComponentId() {
+               return -1; //dummy number shows that it comes from a Stream
+       }
+
+       public int getSourcePiId() {
+               return sourcePiId;
+       }
+
+       @Override
+       public String getStreamId() {
+               return flinkStreamId;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
new file mode 100644
index 0000000..f04d792
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
@@ -0,0 +1,185 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.topology.AbstractTopology;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A SAMOA topology on Apache Flink
+ * 
+ * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated 
within custom operators.
+ * Streams are tagged and filtered in each operator's output so they can be 
routed to the right 
+ * operator respectively. Building a Flink topology from a Samoa task involves 
invoking all these
+ * stream transformations and finally, marking and initiating loops in the 
graph. We have to do that
+ * since Flink only allows explicit loops in the topology started with 
'iterate()' and closed with
+ * 'closeWith()'. Thus, when we build a flink topology we have to do it 
incrementally from the 
+ * sources, mark loops and initialize them with explicit iterations.
+ * 
+ */
+public class FlinkTopology extends AbstractTopology {
+
+       private static final Logger logger = 
LoggerFactory.getLogger(FlinkTopology.class);
+       public static StreamExecutionEnvironment env;
+       public List<List<FlinkProcessingItem>> topologyLoops = new 
ArrayList<>();
+       public  List<Integer> backEdges = new ArrayList<Integer>();
+
+       public FlinkTopology(String name, StreamExecutionEnvironment env) {
+               super(name);
+               this.env = env;
+       }
+
+       public StreamExecutionEnvironment getEnvironment() {
+               return env;
+       }
+       
+       public void build() {
+               markCircles();
+               for (EntranceProcessingItem src : getEntranceProcessingItems()) 
{
+                       ((FlinkEntranceProcessingItem) src).initialise();
+               }
+               
initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), 
FlinkProcessingItem.class)));
+       }
+
+       private void initComponents(ImmutableList<FlinkProcessingItem> 
flinkComponents) {
+               if (flinkComponents.isEmpty()) return;
+
+               for (FlinkProcessingItem comp : flinkComponents) {
+                       if (comp.canBeInitialised() && !comp.isInitialised() && 
!comp.isPartOfCircle()) {
+                               comp.initialise();
+                               comp.initialiseStreams();
+
+                       }//if component is part of one or more circle
+                       else if (comp.isPartOfCircle() && 
!comp.isInitialised()) {
+                               for (Integer circle : comp.getCircleIds()) {
+                                       //check if circle can be initialized
+                                       if (checkCircleReady(circle)) {
+                                               logger.debug("Circle: " + 
circle + " can be initialised");
+                                               initialiseCircle(circle);
+                                       } else {
+                                               logger.debug("Circle cannot be 
initialised");
+                                       }
+                               }
+                       }
+
+               }
+               
initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new 
Predicate<FlinkProcessingItem>() {
+                       @Override
+                       public boolean apply(FlinkProcessingItem 
flinkComponent) {
+                               return !flinkComponent.isInitialised();
+                       }
+               })));
+       }
+
+       private void markCircles(){
+               List<FlinkProcessingItem> pis = 
Lists.newArrayList(Iterables.filter(getProcessingItems(), 
FlinkProcessingItem.class));
+               List<Integer>[] graph = new List[pis.size()];
+               FlinkProcessingItem[] processingItems = new 
FlinkProcessingItem[pis.size()];
+
+
+               for (int i=0;i<pis.size();i++) {
+                       graph[i] = new ArrayList<Integer>();
+               }
+               //construct the graph of the topology for the Processing Items 
(No entrance pi is included)
+               for (FlinkProcessingItem pi: pis) {
+                       processingItems[pi.getComponentId()] = pi;
+                       for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
is : pi.getInputStreams()) {
+                               if (is.f2 != -1) 
graph[is.f2].add(pi.getComponentId());
+                       }
+               }
+               for (int g=0;g<graph.length;g++)
+                       logger.debug(graph[g].toString());
+
+               CircleDetection detCircles = new CircleDetection();
+               List<List<Integer>> circles = detCircles.getCircles(graph);
+
+               //update PIs, regarding being part of a circle.
+               for (List<Integer> c : circles){
+                       List<FlinkProcessingItem> circle = new ArrayList<>();
+                       for (Integer it : c){
+                               circle.add(processingItems[it]);
+                               
processingItems[it].addPItoLoop(topologyLoops.size());
+                       }
+                       topologyLoops.add(circle);
+                       backEdges.add(circle.get(0).getComponentId());
+               }
+               logger.debug("Circles detected in the topology: " + circles);
+       }
+       
+
+       private boolean checkCircleReady(int circleId) {
+
+               List<Integer> circleIds = new ArrayList<>();
+
+               for (FlinkProcessingItem pi : topologyLoops.get(circleId)) {
+                       circleIds.add(pi.getComponentId());
+               }
+               //check that all incoming to the circle streams are initialised
+               for (FlinkProcessingItem procItem : 
topologyLoops.get(circleId)) {
+                       for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : procItem.getInputStreams()) {
+                               //if a inputStream is not initialized AND 
source of inputStream is not in the circle or a tail of other circle
+                               if ((!inputStream.f0.isInitialised()) && 
(!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
+                                       return false;
+                       }
+               }
+               return true;
+       }
+
+       private void initialiseCircle(int circleId) {
+               //get the head and tail of circle
+               FlinkProcessingItem tail = topologyLoops.get(circleId).get(0);
+               FlinkProcessingItem head = 
topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1);
+
+               //initialise source stream of the iteration, so as to use it 
for the iteration starting point
+               if (!head.isInitialised()) {
+                       head.setOnIteration(true);
+                       head.initialise();
+                       head.initialiseStreams();
+               }
+
+               //initialise all nodes after head
+               for (int node = topologyLoops.get(circleId).size() - 2; node >= 
0; node--) {
+                       topologyLoops.get(circleId).get(node).initialise();
+                       
topologyLoops.get(circleId).get(node).initialiseStreams();
+               }
+
+               ((IterativeDataStream) 
head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream());
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
new file mode 100644
index 0000000..16d050a
--- /dev/null
+++ 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
@@ -0,0 +1,42 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+public class SamoaType extends Tuple3<String, ContentEvent, String> {
+       public SamoaType() {
+               super();
+       }
+
+       private SamoaType(String key, ContentEvent event, String streamId) {
+               super(key, event, streamId);
+       }
+
+       public static SamoaType of(ContentEvent event, String streamId) {
+               String key = event.getKey() == null ? "none" : event.getKey();
+               return new SamoaType(key, event, streamId);
+       }
+}
\ No newline at end of file

Reply via email to