Repository: incubator-samoa
Updated Branches:
  refs/heads/master 1ef742b27 -> b86ab83d5 (forced update)


SAMOA-51: Update Flink Module to v0.10.1
Fix #44


Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/b86ab83d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/b86ab83d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/b86ab83d

Branch: refs/heads/master
Commit: b86ab83d5a8e72fdac92cbe72b7655210f4cac02
Parents: 4375bce
Author: Paris Carbone <[email protected]>
Authored: Fri Nov 20 16:43:40 2015 +0100
Committer: Gianmarco De Francisci Morales <[email protected]>
Committed: Sun Feb 7 16:34:13 2016 +0300

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 samoa-flink/pom.xml                             |   2 +-
 .../org/apache/samoa/flink/FlinkDoTask.java     |   7 -
 .../samoa/flink/helpers/CircleDetection.java    |  99 -------
 .../samoa/flink/helpers/CycleDetection.java     |  99 +++++++
 .../org/apache/samoa/flink/helpers/Utils.java   |   4 +-
 .../topology/impl/FlinkProcessingItem.java      |  55 ++--
 .../samoa/flink/topology/impl/FlinkStream.java  |  32 +--
 .../flink/topology/impl/FlinkTopology.java      | 278 ++++++++++---------
 9 files changed, 283 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 71b131f..af8fe98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
         <miniball.version>1.0.3</miniball.version>
         <s4.version>0.6.0-incubating</s4.version>
         <samza.version>0.7.0</samza.version>
-        <flink.version>0.9.0</flink.version>
+        <flink.version>0.10.1</flink.version>
         <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
         <slf4j-simple.version>1.7.5</slf4j-simple.version>
         <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/samoa-flink/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
index f0266fa..5575643 100644
--- a/samoa-flink/pom.xml
+++ b/samoa-flink/pom.xml
@@ -70,7 +70,7 @@
         </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-core</artifactId>
+                       <artifactId>flink-streaming-java</artifactId>
                        <version>${flink.version}</version>
             <!--<scope>provided</scope>-->
                </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java 
b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
index cd0b82c..7805371 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
@@ -21,16 +21,9 @@ package org.apache.samoa.flink;
  */
 
 import com.github.javacliparser.ClassOption;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.samoa.flink.helpers.CircleDetection;
-import org.apache.samoa.flink.helpers.Utils;
 import org.apache.samoa.flink.topology.impl.FlinkComponentFactory;
-import org.apache.samoa.flink.topology.impl.FlinkProcessingItem;
-import org.apache.samoa.flink.topology.impl.FlinkStream;
 import org.apache.samoa.flink.topology.impl.FlinkTopology;
 import org.apache.samoa.tasks.Task;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java 
b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
deleted file mode 100644
index 400e49c..0000000
--- 
a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package org.apache.samoa.flink.helpers;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Stack;
-
-/**
- * This class contains all logic needed in order to mark circles in job graphs 
explicitly such as 
- * in the case of Apache Flink. A circle is defined as a list of node ids 
ordered in topological 
- * (DFS) order.
- * 
- */
-public class CircleDetection {
-       private int[] index;
-       private int[] lowLink;
-       private int counter;
-       private Stack<Integer> stack;
-       private List<List<Integer>> scc;
-       List<Integer>[] graph;
-
-
-       public CircleDetection() {
-               stack = new Stack<Integer>();
-               scc = new ArrayList<>();
-       }
-
-       public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) {
-               graph = adjacencyList;
-               index = new int[adjacencyList.length];
-               lowLink = new int[adjacencyList.length];
-               counter = 0;
-
-               //initialize index and lowLink as "undefined"(=-1)
-               for (int j = 0; j < graph.length; j++) {
-                       index[j] = -1;
-                       lowLink[j] = -1;
-               }
-               for (int v = 0; v < graph.length; v++) {
-                       if (index[v] == -1) { //undefined.
-                               findSCC(v);
-                       }
-               }
-               return scc;
-       }
-
-       private void findSCC(int node) {
-               index[node] = counter;
-               lowLink[node] = counter;
-               counter++;
-               stack.push(node);
-
-               for (int neighbor : graph[node]) {
-                       if (index[neighbor] == -1) {
-                               findSCC(neighbor);
-                               lowLink[node] = Math.min(lowLink[node], 
lowLink[neighbor]);
-                       } else if (stack.contains(neighbor)) { //if neighbor 
has been already visited
-                               lowLink[node] = Math.min(lowLink[node], 
index[neighbor]);
-                               List<Integer> sccComponent = new 
ArrayList<Integer>();
-                               int w;
-                               do {
-                                       w = stack.pop();
-                                       sccComponent.add(w);
-                               } while (neighbor != w);
-                               //add neighbor again, just in case it is a 
member of another circle 
-                               stack.add(neighbor); 
-                               scc.add(sccComponent);
-                       }
-
-               }
-               if (lowLink[node] == index[node]) {
-                       int w;
-                       do {
-                               w = stack.pop();
-                       } while (node != w);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java 
b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
new file mode 100644
index 0000000..9aedb25
--- /dev/null
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
@@ -0,0 +1,99 @@
+package org.apache.samoa.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * This class contains all logic needed in order to mark cycles in job graphs 
explicitly such as 
+ * in the case of Apache Flink. A cycle is defined as a list of node ids 
ordered in topological 
+ * (DFS) order.
+ * 
+ */
+public class CycleDetection {
+       private int[] index;
+       private int[] lowLink;
+       private int counter;
+       private Stack<Integer> stack;
+       private List<List<Integer>> scc;
+       List<Integer>[] graph;
+
+
+       public CycleDetection() {
+               stack = new Stack<>();
+               scc = new ArrayList<>();
+       }
+
+       public List<List<Integer>> getCycles(List<Integer>[] adjacencyList) {
+               graph = adjacencyList;
+               index = new int[adjacencyList.length];
+               lowLink = new int[adjacencyList.length];
+               counter = 0;
+
+               //initialize index and lowLink as "undefined"(=-1)
+               for (int j = 0; j < graph.length; j++) {
+                       index[j] = -1;
+                       lowLink[j] = -1;
+               }
+               for (int v = 0; v < graph.length; v++) {
+                       if (index[v] == -1) { //undefined.
+                               findSCC(v);
+                       }
+               }
+               return scc;
+       }
+
+       private void findSCC(int node) {
+               index[node] = counter;
+               lowLink[node] = counter;
+               counter++;
+               stack.push(node);
+
+               for (int neighbor : graph[node]) {
+                       if (index[neighbor] == -1) {
+                               findSCC(neighbor);
+                               lowLink[node] = Math.min(lowLink[node], 
lowLink[neighbor]);
+                       } else if (stack.contains(neighbor)) { //if neighbor 
has been already visited
+                               lowLink[node] = Math.min(lowLink[node], 
index[neighbor]);
+                               List<Integer> sccComponent = new 
ArrayList<Integer>();
+                               int w;
+                               do {
+                                       w = stack.pop();
+                                       sccComponent.add(w);
+                               } while (neighbor != w);
+                               //add neighbor again, just in case it is a 
member of another cycle 
+                               stack.add(neighbor); 
+                               scc.add(sccComponent);
+                       }
+
+               }
+               if (lowLink[node] == index[node]) {
+                       int w;
+                       do {
+                               w = stack.pop();
+                       } while (node != w);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java 
b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
index 38b4bdc..ce01567 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
@@ -32,8 +32,6 @@ import org.apache.samoa.core.ContentEvent;
 import org.apache.samoa.flink.topology.impl.SamoaType;
 import org.apache.samoa.utils.PartitioningScheme;
 
-import java.util.List;
-
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 
 public class Utils {
@@ -45,7 +43,7 @@ public class Utils {
                        case BROADCAST:
                                return stream.broadcast();
                        case GROUP_BY_KEY:
-                               return stream.groupBy(new 
KeySelector<SamoaType, String>() {
+                               return stream.keyBy(new KeySelector<SamoaType, 
String>() {
                                        @Override
                                        public String getKey(SamoaType 
samoaType) throws Exception {
                                                return samoaType.f0;

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
index 28701df..9e3c880 100644
--- 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
@@ -22,20 +22,20 @@ package org.apache.samoa.flink.topology.impl;
 
 
 import com.google.common.collect.Lists;
-
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.samoa.core.ContentEvent;
 import org.apache.samoa.core.Processor;
 import org.apache.samoa.flink.helpers.Utils;
 import org.apache.samoa.topology.ProcessingItem;
 import org.apache.samoa.topology.Stream;
 import org.apache.samoa.utils.PartitioningScheme;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,9 +60,8 @@ public class FlinkProcessingItem extends 
AbstractUdfStreamOperator<SamoaType, Fl
        private int parallelism;
        private static int numberOfPIs = 0;
        private int piID;
-       private List<Integer> circleId; //check if we can refactor this
+       private List<Integer> cycleId; //check if we can refactor this
        private boolean onIteration;
-       //private int circleId; //check if we can refactor this
 
        public FlinkProcessingItem(StreamExecutionEnvironment env, Processor 
proc) {
                this(env, proc, 1);
@@ -79,8 +78,8 @@ public class FlinkProcessingItem extends 
AbstractUdfStreamOperator<SamoaType, Fl
                this.processor = proc;
                this.parallelism = parallelism;
                this.piID = numberOfPIs++;
-               this.circleId = new ArrayList<Integer>() {
-               }; // if size equals 0, then it is part of no circle
+               this.cycleId = new ArrayList<Integer>() {
+               }; // if size equals 0, then it is part of no cycle
        }
 
        public Stream createStream() {
@@ -90,12 +89,12 @@ public class FlinkProcessingItem extends 
AbstractUdfStreamOperator<SamoaType, Fl
        }
 
        public void putToStream(ContentEvent data, Stream targetStream) {
-               output.collect(SamoaType.of(data, targetStream.getStreamId()));
+               output.collect(new StreamRecord<>(SamoaType.of(data, 
targetStream.getStreamId())));
        }
-
+       
        @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
+       public void open() throws Exception {
+               super.open();
                this.processor.onCreate(getComponentId());
        }
 
@@ -148,8 +147,13 @@ public class FlinkProcessingItem extends 
AbstractUdfStreamOperator<SamoaType, Fl
        }
 
        @Override
-       public void processElement(SamoaType samoaType) throws Exception {
-               fun.processEvent(samoaType.f1);
+       public void processElement(StreamRecord<SamoaType> streamRecord) throws 
Exception {
+               fun.processEvent(streamRecord.getValue().f1);
+       }
+
+       @Override
+       public void processWatermark(Watermark watermark) throws Exception {
+
        }
 
        @Override
@@ -175,10 +179,6 @@ public class FlinkProcessingItem extends 
AbstractUdfStreamOperator<SamoaType, Fl
                return parallelism;
        }
 
-       public void setParallelism(int parallelism) {
-               this.parallelism = parallelism;
-       }
-
        public List<FlinkStream> getOutputStreams() {
                return outputStreams;
        }
@@ -187,28 +187,24 @@ public class FlinkProcessingItem extends 
AbstractUdfStreamOperator<SamoaType, Fl
                return this.outStream;
        }
 
-       public void setOutStream(DataStream outStream) {
-               this.outStream = outStream;
-       }
-
        @Override
        public int getComponentId() {
                return piID;
        }
 
-       public boolean isPartOfCircle() {
-               return this.circleId.size() > 0;
+       public boolean isPartOfCycle() {
+               return this.cycleId.size() > 0;
        }
 
-       public List<Integer> getCircleIds() {
-               return circleId;
+       public List<Integer> getCycleIds() {
+               return cycleId;
        }
 
-       public void addPItoLoop(int piId) {
-               this.circleId.add(piId);
+       public void addPItoCycle(int piId) {
+               this.cycleId.add(piId);
        }
 
-       public DataStream<SamoaType> getInStream() {
+       public DataStream<SamoaType> getDataStream() {
                return inStream;
        }
 
@@ -219,6 +215,7 @@ public class FlinkProcessingItem extends 
AbstractUdfStreamOperator<SamoaType, Fl
        public void setOnIteration(boolean onIteration) {
                this.onIteration = onIteration;
        }
+       
 
        static class SamoaDelegateFunction implements Function, Serializable {
                private final Processor proc;

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
index 31617a7..286802c 100644
--- 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
@@ -36,46 +36,44 @@ import java.io.Serializable;
 public class FlinkStream extends AbstractStream implements FlinkComponent, 
Serializable {
 
        private static int outputCounter = 0;
-       private FlinkComponent procItem;
-       private transient DataStream<SamoaType> dataStream;
-       private int sourcePiId;
-       private String flinkStreamId;
+       private FlinkComponent sourceComponent;
+       private transient DataStream<SamoaType> filteredStream;
+       private String filterID;
 
        public FlinkStream(FlinkComponent sourcePi) {
-               this.procItem = sourcePi;
-               this.sourcePiId = sourcePi.getComponentId();
+               this.sourceComponent = sourcePi;
                setStreamId("stream-" + Integer.toString(outputCounter));
-               flinkStreamId = "stream-" + Integer.toString(outputCounter);
+               filterID = "stream-" + Integer.toString(outputCounter);
                outputCounter++;
        }
 
        @Override
        public void initialise() {
-               if (procItem instanceof FlinkProcessingItem) {
-                       dataStream = 
procItem.getOutStream().filter(Utils.getFilter(getStreamId()))
-                       .setParallelism(((FlinkProcessingItem) 
procItem).getParallelism());
+               if (sourceComponent instanceof FlinkProcessingItem) {
+                       filteredStream = 
sourceComponent.getOutStream().filter(Utils.getFilter(getStreamId()))
+                       .setParallelism(((FlinkProcessingItem) 
sourceComponent).getParallelism());
                } else
-                       dataStream = procItem.getOutStream();
+                       filteredStream = sourceComponent.getOutStream();
        }
 
        @Override
        public boolean canBeInitialised() {
-               return procItem.isInitialised();
+               return sourceComponent.isInitialised();
        }
 
        @Override
        public boolean isInitialised() {
-               return dataStream != null;
+               return filteredStream != null;
        }
 
        @Override
        public DataStream getOutStream() {
-               return dataStream;
+               return filteredStream;
        }
 
        @Override
        public void put(ContentEvent event) {
-               ((FlinkProcessingItem) procItem).putToStream(event, this);
+               ((FlinkProcessingItem) sourceComponent).putToStream(event, 
this);
        }
 
        @Override
@@ -84,11 +82,11 @@ public class FlinkStream extends AbstractStream implements 
FlinkComponent, Seria
        }
 
        public int getSourcePiId() {
-               return sourcePiId;
+               return sourceComponent.getComponentId();
        }
 
        @Override
        public String getStreamId() {
-               return flinkStreamId;
+               return filterID;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/b86ab83d/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
index 65c52c6..a09ba71 100644
--- 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
@@ -21,17 +21,15 @@ package org.apache.samoa.flink.topology.impl;
  */
 
 
-
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.samoa.flink.helpers.CircleDetection;
-import org.apache.samoa.flink.helpers.Utils;
+import org.apache.samoa.flink.helpers.CycleDetection;
 import org.apache.samoa.topology.AbstractTopology;
 import org.apache.samoa.topology.EntranceProcessingItem;
 import org.apache.samoa.utils.PartitioningScheme;
@@ -43,144 +41,148 @@ import java.util.List;
 
 /**
  * A SAMOA topology on Apache Flink
- * 
+ * <p/>
  * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated 
within custom operators.
- * Streams are tagged and filtered in each operator's output so they can be 
routed to the right 
+ * Streams are tagged and filtered in each operator's output so they can be 
routed to the right
  * operator respectively. Building a Flink topology from a Samoa task involves 
invoking all these
- * stream transformations and finally, marking and initiating loops in the 
graph. We have to do that
- * since Flink only allows explicit loops in the topology started with 
'iterate()' and closed with
- * 'closeWith()'. Thus, when we build a flink topology we have to do it 
incrementally from the 
- * sources, mark loops and initialize them with explicit iterations.
- * 
+ * stream transformations and finally, marking and initiating cycles in the 
graph. We have to do that
+ * since Flink only allows explicit cycles in the topology started with 
'iterate()' and closed with
+ * 'closeWith()'. Thus, when we build a flink topology we have to do it 
incrementally from the
+ * sources, mark cycles and initialize them with explicit iterations.
  */
 public class FlinkTopology extends AbstractTopology {
 
-       private static final Logger logger = 
LoggerFactory.getLogger(FlinkTopology.class);
-       public static StreamExecutionEnvironment env;
-       public List<List<FlinkProcessingItem>> topologyLoops = new 
ArrayList<>();
-       public  List<Integer> backEdges = new ArrayList<Integer>();
-
-       public FlinkTopology(String name, StreamExecutionEnvironment env) {
-               super(name);
-               this.env = env;
-       }
-
-       public StreamExecutionEnvironment getEnvironment() {
-               return env;
-       }
-       
-       public void build() {
-               markCircles();
-               for (EntranceProcessingItem src : getEntranceProcessingItems()) 
{
-                       ((FlinkEntranceProcessingItem) src).initialise();
-               }
-               
initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), 
FlinkProcessingItem.class)));
-       }
-
-       private void initComponents(ImmutableList<FlinkProcessingItem> 
flinkComponents) {
-               if (flinkComponents.isEmpty()) return;
-
-               for (FlinkProcessingItem comp : flinkComponents) {
-                       if (comp.canBeInitialised() && !comp.isInitialised() && 
!comp.isPartOfCircle()) {
-                               comp.initialise();
-                               comp.initialiseStreams();
-
-                       }//if component is part of one or more circle
-                       else if (comp.isPartOfCircle() && 
!comp.isInitialised()) {
-                               for (Integer circle : comp.getCircleIds()) {
-                                       //check if circle can be initialized
-                                       if (checkCircleReady(circle)) {
-                                               logger.debug("Circle: " + 
circle + " can be initialised");
-                                               initialiseCircle(circle);
-                                       } else {
-                                               logger.debug("Circle cannot be 
initialised");
-                                       }
-                               }
-                       }
-
-               }
-               
initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new 
Predicate<FlinkProcessingItem>() {
-                       @Override
-                       public boolean apply(FlinkProcessingItem 
flinkComponent) {
-                               return !flinkComponent.isInitialised();
-                       }
-               })));
-       }
-
-       private void markCircles(){
-               List<FlinkProcessingItem> pis = 
Lists.newArrayList(Iterables.filter(getProcessingItems(), 
FlinkProcessingItem.class));
-               List<Integer>[] graph = new List[pis.size()];
-               FlinkProcessingItem[] processingItems = new 
FlinkProcessingItem[pis.size()];
-
-
-               for (int i=0;i<pis.size();i++) {
-                       graph[i] = new ArrayList<Integer>();
-               }
-               //construct the graph of the topology for the Processing Items 
(No entrance pi is included)
-               for (FlinkProcessingItem pi: pis) {
-                       processingItems[pi.getComponentId()] = pi;
-                       for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
is : pi.getInputStreams()) {
-                               if (is.f2 != -1) 
graph[is.f2].add(pi.getComponentId());
-                       }
-               }
-               for (int g=0;g<graph.length;g++)
-                       logger.debug(graph[g].toString());
-
-               CircleDetection detCircles = new CircleDetection();
-               List<List<Integer>> circles = detCircles.getCircles(graph);
-
-               //update PIs, regarding being part of a circle.
-               for (List<Integer> c : circles){
-                       List<FlinkProcessingItem> circle = new ArrayList<>();
-                       for (Integer it : c){
-                               circle.add(processingItems[it]);
-                               
processingItems[it].addPItoLoop(topologyLoops.size());
-                       }
-                       topologyLoops.add(circle);
-                       backEdges.add(circle.get(0).getComponentId());
-               }
-               logger.debug("Circles detected in the topology: " + circles);
-       }
-       
-
-       private boolean checkCircleReady(int circleId) {
-
-               List<Integer> circleIds = new ArrayList<>();
-
-               for (FlinkProcessingItem pi : topologyLoops.get(circleId)) {
-                       circleIds.add(pi.getComponentId());
-               }
-               //check that all incoming to the circle streams are initialised
-               for (FlinkProcessingItem procItem : 
topologyLoops.get(circleId)) {
-                       for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : procItem.getInputStreams()) {
-                               //if a inputStream is not initialized AND 
source of inputStream is not in the circle or a tail of other circle
-                               if ((!inputStream.f0.isInitialised()) && 
(!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
-                                       return false;
-                       }
-               }
-               return true;
-       }
-
-       private void initialiseCircle(int circleId) {
-               //get the head and tail of circle
-               FlinkProcessingItem tail = topologyLoops.get(circleId).get(0);
-               FlinkProcessingItem head = 
topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1);
-
-               //initialise source stream of the iteration, so as to use it 
for the iteration starting point
-               if (!head.isInitialised()) {
-                       head.setOnIteration(true);
-                       head.initialise();
-                       head.initialiseStreams();
-               }
-
-               //initialise all nodes after head
-               for (int node = topologyLoops.get(circleId).size() - 2; node >= 
0; node--) {
-                       topologyLoops.get(circleId).get(node).initialise();
-                       
topologyLoops.get(circleId).get(node).initialiseStreams();
-               }
-
-               ((IterativeDataStream) 
head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream());
-       }
+    private static final Logger logger = 
LoggerFactory.getLogger(FlinkTopology.class);
+    public static StreamExecutionEnvironment env;
+    public List<List<FlinkProcessingItem>> cycles = new ArrayList<>();
+    public List<Integer> backEdges = new ArrayList<Integer>(); 
+
+    public FlinkTopology(String name, StreamExecutionEnvironment env) {
+        super(name);
+        this.env = env;
+    }
+
+    public StreamExecutionEnvironment getEnvironment() {
+        return env;
+    }
+
+    public void build() {
+        markCycles();
+        for (EntranceProcessingItem src : getEntranceProcessingItems()) {
+            ((FlinkEntranceProcessingItem) src).initialise();
+        }
+        
initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), 
FlinkProcessingItem.class)));
+    }
+
+    private void initComponents(ImmutableList<FlinkProcessingItem> 
flinkComponents) {
+        if (flinkComponents.isEmpty()) return;
+
+        for (FlinkProcessingItem comp : flinkComponents) {
+            if (comp.canBeInitialised() && !comp.isInitialised() && 
!comp.isPartOfCycle()) {
+                comp.initialise();
+                comp.initialiseStreams();
+
+            }//if component is part of one or more cycle
+            else if (comp.isPartOfCycle() && !comp.isInitialised()) {
+                for (Integer cycle : comp.getCycleIds()) {
+                    //check if cycle can be initialized
+                    if (completenessCheck(cycle)) {
+                        logger.debug("Cycle: " + cycle + " can be 
initialised");
+                        initializeCycle(cycle);
+                    } else {
+                        logger.debug("Cycle cannot be initialised");
+                    }
+                }
+            }
+        }
+        initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, 
new Predicate<FlinkProcessingItem>() {
+            @Override
+            public boolean apply(FlinkProcessingItem flinkComponent) {
+                return !flinkComponent.isInitialised();
+            }
+        })));
+    }
+
+    /**
+     * Detects and marks all cycles and backedges needed to construct a Flink 
topology
+     */
+    private void markCycles() {
+        List<FlinkProcessingItem> pis = 
Lists.newArrayList(Iterables.filter(getProcessingItems(), 
FlinkProcessingItem.class));
+        List<Integer>[] graph = new List[pis.size()];
+        FlinkProcessingItem[] processingItems = new 
FlinkProcessingItem[pis.size()];
+
+
+        for (int i = 0; i < pis.size(); i++) {
+            graph[i] = new ArrayList<>();
+        }
+        //construct the graph of the topology for the Processing Items (No 
entrance pi is included)
+        for (FlinkProcessingItem pi : pis) {
+            processingItems[pi.getComponentId()] = pi;
+            for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : 
pi.getInputStreams()) {
+                if (is.f2 != -1) graph[is.f2].add(pi.getComponentId());
+            }
+        }
+        for (int g = 0; g < graph.length; g++)
+            logger.debug(graph[g].toString());
+
+        CycleDetection detCycles = new CycleDetection();
+        List<List<Integer>> graphCycles = detCycles.getCycles(graph);
+
+        //update PIs, regarding being part of a cycle.
+        for (List<Integer> c : graphCycles) {
+            List<FlinkProcessingItem> cycle = new ArrayList<>();
+            for (Integer it : c) {
+                cycle.add(processingItems[it]);
+                processingItems[it].addPItoCycle(cycles.size());
+            }
+            cycles.add(cycle);
+            backEdges.add(cycle.get(0).getComponentId());
+        }
+        logger.debug("Cycles detected in the topology: " + graphCycles);
+    }
+
+
+    private boolean completenessCheck(int cycleId) {
+
+        List<Integer> cycleIDs = new ArrayList<>();
+
+        for (FlinkProcessingItem pi : cycles.get(cycleId)) {
+            cycleIDs.add(pi.getComponentId());
+        }
+        //check that all incoming to the cycle streams are initialised
+        for (FlinkProcessingItem procItem : cycles.get(cycleId)) {
+            for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream 
: procItem.getInputStreams()) {
+                //if a inputStream is not initialized AND source of 
inputStream is not in the cycle or a tail of other cycle
+                if ((!inputStream.f0.isInitialised()) && 
(!cycleIDs.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
+                    return false;
+            }
+        }
+        return true;
+    }
+
+    private void initializeCycle(int cycleID) {
+        //get the head and tail of cycle
+        FlinkProcessingItem tail = cycles.get(cycleID).get(0);
+        FlinkProcessingItem head = 
cycles.get(cycleID).get(cycles.get(cycleID).size() - 1);
+
+        //initialise source stream of the iteration, so as to use it for the 
iteration starting point
+        if (!head.isInitialised()) {
+            head.setOnIteration(true);
+            head.initialise();
+            head.initialiseStreams();
+        }
+
+        //initialise all nodes after head
+        for (int node = cycles.get(cycleID).size() - 2; node >= 0; node--) {
+            FlinkProcessingItem processingItem = cycles.get(cycleID).get(node);
+            processingItem.initialise();
+            processingItem.initialiseStreams();
+        }
+
+        SingleOutputStreamOperator backedge = (SingleOutputStreamOperator) 
head.getInputStreamBySourceID(tail.getComponentId()).getOutStream();
+        backedge.setParallelism(head.getParallelism());
+        ((IterativeStream) head.getDataStream()).closeWith(backedge);
+    }
 
 
 }

Reply via email to