Repository: incubator-samoa Updated Branches: refs/heads/master 4375bce7b -> 1ef742b27
SAMOA-51: Update Flink Module to v0.10.1 Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/1ef742b2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/1ef742b2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/1ef742b2 Branch: refs/heads/master Commit: 1ef742b27ffdd224687ac8adcab6c250755d82ed Parents: 4375bce Author: Paris Carbone <[email protected]> Authored: Fri Nov 20 16:43:40 2015 +0100 Committer: Gianmarco De Francisci Morales <[email protected]> Committed: Sun Feb 7 16:33:04 2016 +0300 ---------------------------------------------------------------------- pom.xml | 2 +- samoa-flink/pom.xml | 2 +- .../org/apache/samoa/flink/FlinkDoTask.java | 7 - .../samoa/flink/helpers/CircleDetection.java | 99 ------- .../samoa/flink/helpers/CycleDetection.java | 99 +++++++ .../org/apache/samoa/flink/helpers/Utils.java | 4 +- .../topology/impl/FlinkProcessingItem.java | 55 ++-- .../samoa/flink/topology/impl/FlinkStream.java | 32 +-- .../flink/topology/impl/FlinkTopology.java | 278 ++++++++++--------- 9 files changed, 283 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 71b131f..af8fe98 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,7 @@ <miniball.version>1.0.3</miniball.version> <s4.version>0.6.0-incubating</s4.version> <samza.version>0.7.0</samza.version> - <flink.version>0.9.0</flink.version> + <flink.version>0.10.1</flink.version> <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version> <slf4j-simple.version>1.7.5</slf4j-simple.version> <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/samoa-flink/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml index f0266fa..5575643 100644 --- a/samoa-flink/pom.xml +++ b/samoa-flink/pom.xml @@ -70,7 +70,7 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-core</artifactId> + <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java index cd0b82c..7805371 100644 --- a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java @@ -21,16 +21,9 @@ package org.apache.samoa.flink; */ import com.github.javacliparser.ClassOption; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.samoa.flink.helpers.CircleDetection; -import org.apache.samoa.flink.helpers.Utils; import org.apache.samoa.flink.topology.impl.FlinkComponentFactory; -import org.apache.samoa.flink.topology.impl.FlinkProcessingItem; -import org.apache.samoa.flink.topology.impl.FlinkStream; import org.apache.samoa.flink.topology.impl.FlinkTopology; import org.apache.samoa.tasks.Task; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java deleted file mode 100644 index 400e49c..0000000 --- a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java +++ /dev/null @@ -1,99 +0,0 @@ -package org.apache.samoa.flink.helpers; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - -import java.util.ArrayList; -import java.util.List; -import java.util.Stack; - -/** - * This class contains all logic needed in order to mark circles in job graphs explicitly such as - * in the case of Apache Flink. A circle is defined as a list of node ids ordered in topological - * (DFS) order. - * - */ -public class CircleDetection { - private int[] index; - private int[] lowLink; - private int counter; - private Stack<Integer> stack; - private List<List<Integer>> scc; - List<Integer>[] graph; - - - public CircleDetection() { - stack = new Stack<Integer>(); - scc = new ArrayList<>(); - } - - public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) { - graph = adjacencyList; - index = new int[adjacencyList.length]; - lowLink = new int[adjacencyList.length]; - counter = 0; - - //initialize index and lowLink as "undefined"(=-1) - for (int j = 0; j < graph.length; j++) { - index[j] = -1; - lowLink[j] = -1; - } - for (int v = 0; v < graph.length; v++) { - if (index[v] == -1) { //undefined. - findSCC(v); - } - } - return scc; - } - - private void findSCC(int node) { - index[node] = counter; - lowLink[node] = counter; - counter++; - stack.push(node); - - for (int neighbor : graph[node]) { - if (index[neighbor] == -1) { - findSCC(neighbor); - lowLink[node] = Math.min(lowLink[node], lowLink[neighbor]); - } else if (stack.contains(neighbor)) { //if neighbor has been already visited - lowLink[node] = Math.min(lowLink[node], index[neighbor]); - List<Integer> sccComponent = new ArrayList<Integer>(); - int w; - do { - w = stack.pop(); - sccComponent.add(w); - } while (neighbor != w); - //add neighbor again, just in case it is a member of another circle - stack.add(neighbor); - scc.add(sccComponent); - } - - } - if (lowLink[node] == index[node]) { - int w; - do { - w = stack.pop(); - } while (node != w); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java new file mode 100644 index 0000000..9aedb25 --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java @@ -0,0 +1,99 @@ +package org.apache.samoa.flink.helpers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +/** + * This class contains all logic needed in order to mark cycles in job graphs explicitly such as + * in the case of Apache Flink. A cycle is defined as a list of node ids ordered in topological + * (DFS) order. + * + */ +public class CycleDetection { + private int[] index; + private int[] lowLink; + private int counter; + private Stack<Integer> stack; + private List<List<Integer>> scc; + List<Integer>[] graph; + + + public CycleDetection() { + stack = new Stack<>(); + scc = new ArrayList<>(); + } + + public List<List<Integer>> getCycles(List<Integer>[] adjacencyList) { + graph = adjacencyList; + index = new int[adjacencyList.length]; + lowLink = new int[adjacencyList.length]; + counter = 0; + + //initialize index and lowLink as "undefined"(=-1) + for (int j = 0; j < graph.length; j++) { + index[j] = -1; + lowLink[j] = -1; + } + for (int v = 0; v < graph.length; v++) { + if (index[v] == -1) { //undefined. + findSCC(v); + } + } + return scc; + } + + private void findSCC(int node) { + index[node] = counter; + lowLink[node] = counter; + counter++; + stack.push(node); + + for (int neighbor : graph[node]) { + if (index[neighbor] == -1) { + findSCC(neighbor); + lowLink[node] = Math.min(lowLink[node], lowLink[neighbor]); + } else if (stack.contains(neighbor)) { //if neighbor has been already visited + lowLink[node] = Math.min(lowLink[node], index[neighbor]); + List<Integer> sccComponent = new ArrayList<Integer>(); + int w; + do { + w = stack.pop(); + sccComponent.add(w); + } while (neighbor != w); + //add neighbor again, just in case it is a member of another cycle + stack.add(neighbor); + scc.add(sccComponent); + } + + } + if (lowLink[node] == index[node]) { + int w; + do { + w = stack.pop(); + } while (node != w); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java index 38b4bdc..ce01567 100644 --- a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java @@ -32,8 +32,6 @@ import org.apache.samoa.core.ContentEvent; import org.apache.samoa.flink.topology.impl.SamoaType; import org.apache.samoa.utils.PartitioningScheme; -import java.util.List; - import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; public class Utils { @@ -45,7 +43,7 @@ public class Utils { case BROADCAST: return stream.broadcast(); case GROUP_BY_KEY: - return stream.groupBy(new KeySelector<SamoaType, String>() { + return stream.keyBy(new KeySelector<SamoaType, String>() { @Override public String getKey(SamoaType samoaType) throws Exception { return samoaType.f0; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java index 28701df..9e3c880 100644 --- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java @@ -22,20 +22,20 @@ package org.apache.samoa.flink.topology.impl; import com.google.common.collect.Lists; - import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.samoa.core.ContentEvent; import org.apache.samoa.core.Processor; import org.apache.samoa.flink.helpers.Utils; import org.apache.samoa.topology.ProcessingItem; import org.apache.samoa.topology.Stream; import org.apache.samoa.utils.PartitioningScheme; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,9 +60,8 @@ public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, Fl private int parallelism; private static int numberOfPIs = 0; private int piID; - private List<Integer> circleId; //check if we can refactor this + private List<Integer> cycleId; //check if we can refactor this private boolean onIteration; - //private int circleId; //check if we can refactor this public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc) { this(env, proc, 1); @@ -79,8 +78,8 @@ public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, Fl this.processor = proc; this.parallelism = parallelism; this.piID = numberOfPIs++; - this.circleId = new ArrayList<Integer>() { - }; // if size equals 0, then it is part of no circle + this.cycleId = new ArrayList<Integer>() { + }; // if size equals 0, then it is part of no cycle } public Stream createStream() { @@ -90,12 +89,12 @@ public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, Fl } public void putToStream(ContentEvent data, Stream targetStream) { - output.collect(SamoaType.of(data, targetStream.getStreamId())); + output.collect(new StreamRecord<>(SamoaType.of(data, targetStream.getStreamId()))); } - + @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); + public void open() throws Exception { + super.open(); this.processor.onCreate(getComponentId()); } @@ -148,8 +147,13 @@ public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, Fl } @Override - public void processElement(SamoaType samoaType) throws Exception { - fun.processEvent(samoaType.f1); + public void processElement(StreamRecord<SamoaType> streamRecord) throws Exception { + fun.processEvent(streamRecord.getValue().f1); + } + + @Override + public void processWatermark(Watermark watermark) throws Exception { + } @Override @@ -175,10 +179,6 @@ public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, Fl return parallelism; } - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - public List<FlinkStream> getOutputStreams() { return outputStreams; } @@ -187,28 +187,24 @@ public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, Fl return this.outStream; } - public void setOutStream(DataStream outStream) { - this.outStream = outStream; - } - @Override public int getComponentId() { return piID; } - public boolean isPartOfCircle() { - return this.circleId.size() > 0; + public boolean isPartOfCycle() { + return this.cycleId.size() > 0; } - public List<Integer> getCircleIds() { - return circleId; + public List<Integer> getCycleIds() { + return cycleId; } - public void addPItoLoop(int piId) { - this.circleId.add(piId); + public void addPItoCycle(int piId) { + this.cycleId.add(piId); } - public DataStream<SamoaType> getInStream() { + public DataStream<SamoaType> getDataStream() { return inStream; } @@ -219,6 +215,7 @@ public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, Fl public void setOnIteration(boolean onIteration) { this.onIteration = onIteration; } + static class SamoaDelegateFunction implements Function, Serializable { private final Processor proc; http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java index 31617a7..286802c 100644 --- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java @@ -36,46 +36,44 @@ import java.io.Serializable; public class FlinkStream extends AbstractStream implements FlinkComponent, Serializable { private static int outputCounter = 0; - private FlinkComponent procItem; - private transient DataStream<SamoaType> dataStream; - private int sourcePiId; - private String flinkStreamId; + private FlinkComponent sourceComponent; + private transient DataStream<SamoaType> filteredStream; + private String filterID; public FlinkStream(FlinkComponent sourcePi) { - this.procItem = sourcePi; - this.sourcePiId = sourcePi.getComponentId(); + this.sourceComponent = sourcePi; setStreamId("stream-" + Integer.toString(outputCounter)); - flinkStreamId = "stream-" + Integer.toString(outputCounter); + filterID = "stream-" + Integer.toString(outputCounter); outputCounter++; } @Override public void initialise() { - if (procItem instanceof FlinkProcessingItem) { - dataStream = procItem.getOutStream().filter(Utils.getFilter(getStreamId())) - .setParallelism(((FlinkProcessingItem) procItem).getParallelism()); + if (sourceComponent instanceof FlinkProcessingItem) { + filteredStream = sourceComponent.getOutStream().filter(Utils.getFilter(getStreamId())) + .setParallelism(((FlinkProcessingItem) sourceComponent).getParallelism()); } else - dataStream = procItem.getOutStream(); + filteredStream = sourceComponent.getOutStream(); } @Override public boolean canBeInitialised() { - return procItem.isInitialised(); + return sourceComponent.isInitialised(); } @Override public boolean isInitialised() { - return dataStream != null; + return filteredStream != null; } @Override public DataStream getOutStream() { - return dataStream; + return filteredStream; } @Override public void put(ContentEvent event) { - ((FlinkProcessingItem) procItem).putToStream(event, this); + ((FlinkProcessingItem) sourceComponent).putToStream(event, this); } @Override @@ -84,11 +82,11 @@ public class FlinkStream extends AbstractStream implements FlinkComponent, Seria } public int getSourcePiId() { - return sourcePiId; + return sourceComponent.getComponentId(); } @Override public String getStreamId() { - return flinkStreamId; + return filterID; } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/1ef742b2/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java index 65c52c6..a09ba71 100644 --- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java @@ -21,17 +21,15 @@ package org.apache.samoa.flink.topology.impl; */ - import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; - import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.datastream.IterativeDataStream; +import org.apache.flink.streaming.api.datastream.IterativeStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.samoa.flink.helpers.CircleDetection; -import org.apache.samoa.flink.helpers.Utils; +import org.apache.samoa.flink.helpers.CycleDetection; import org.apache.samoa.topology.AbstractTopology; import org.apache.samoa.topology.EntranceProcessingItem; import org.apache.samoa.utils.PartitioningScheme; @@ -43,144 +41,148 @@ import java.util.List; /** * A SAMOA topology on Apache Flink - * + * <p/> * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated within custom operators. - * Streams are tagged and filtered in each operator's output so they can be routed to the right + * Streams are tagged and filtered in each operator's output so they can be routed to the right * operator respectively. Building a Flink topology from a Samoa task involves invoking all these - * stream transformations and finally, marking and initiating loops in the graph. We have to do that - * since Flink only allows explicit loops in the topology started with 'iterate()' and closed with - * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the - * sources, mark loops and initialize them with explicit iterations. - * + * stream transformations and finally, marking and initiating cycles in the graph. We have to do that + * since Flink only allows explicit cycles in the topology started with 'iterate()' and closed with + * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the + * sources, mark cycles and initialize them with explicit iterations. */ public class FlinkTopology extends AbstractTopology { - private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class); - public static StreamExecutionEnvironment env; - public List<List<FlinkProcessingItem>> topologyLoops = new ArrayList<>(); - public List<Integer> backEdges = new ArrayList<Integer>(); - - public FlinkTopology(String name, StreamExecutionEnvironment env) { - super(name); - this.env = env; - } - - public StreamExecutionEnvironment getEnvironment() { - return env; - } - - public void build() { - markCircles(); - for (EntranceProcessingItem src : getEntranceProcessingItems()) { - ((FlinkEntranceProcessingItem) src).initialise(); - } - initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class))); - } - - private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) { - if (flinkComponents.isEmpty()) return; - - for (FlinkProcessingItem comp : flinkComponents) { - if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCircle()) { - comp.initialise(); - comp.initialiseStreams(); - - }//if component is part of one or more circle - else if (comp.isPartOfCircle() && !comp.isInitialised()) { - for (Integer circle : comp.getCircleIds()) { - //check if circle can be initialized - if (checkCircleReady(circle)) { - logger.debug("Circle: " + circle + " can be initialised"); - initialiseCircle(circle); - } else { - logger.debug("Circle cannot be initialised"); - } - } - } - - } - initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() { - @Override - public boolean apply(FlinkProcessingItem flinkComponent) { - return !flinkComponent.isInitialised(); - } - }))); - } - - private void markCircles(){ - List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)); - List<Integer>[] graph = new List[pis.size()]; - FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()]; - - - for (int i=0;i<pis.size();i++) { - graph[i] = new ArrayList<Integer>(); - } - //construct the graph of the topology for the Processing Items (No entrance pi is included) - for (FlinkProcessingItem pi: pis) { - processingItems[pi.getComponentId()] = pi; - for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) { - if (is.f2 != -1) graph[is.f2].add(pi.getComponentId()); - } - } - for (int g=0;g<graph.length;g++) - logger.debug(graph[g].toString()); - - CircleDetection detCircles = new CircleDetection(); - List<List<Integer>> circles = detCircles.getCircles(graph); - - //update PIs, regarding being part of a circle. - for (List<Integer> c : circles){ - List<FlinkProcessingItem> circle = new ArrayList<>(); - for (Integer it : c){ - circle.add(processingItems[it]); - processingItems[it].addPItoLoop(topologyLoops.size()); - } - topologyLoops.add(circle); - backEdges.add(circle.get(0).getComponentId()); - } - logger.debug("Circles detected in the topology: " + circles); - } - - - private boolean checkCircleReady(int circleId) { - - List<Integer> circleIds = new ArrayList<>(); - - for (FlinkProcessingItem pi : topologyLoops.get(circleId)) { - circleIds.add(pi.getComponentId()); - } - //check that all incoming to the circle streams are initialised - for (FlinkProcessingItem procItem : topologyLoops.get(circleId)) { - for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) { - //if a inputStream is not initialized AND source of inputStream is not in the circle or a tail of other circle - if ((!inputStream.f0.isInitialised()) && (!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2))) - return false; - } - } - return true; - } - - private void initialiseCircle(int circleId) { - //get the head and tail of circle - FlinkProcessingItem tail = topologyLoops.get(circleId).get(0); - FlinkProcessingItem head = topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1); - - //initialise source stream of the iteration, so as to use it for the iteration starting point - if (!head.isInitialised()) { - head.setOnIteration(true); - head.initialise(); - head.initialiseStreams(); - } - - //initialise all nodes after head - for (int node = topologyLoops.get(circleId).size() - 2; node >= 0; node--) { - topologyLoops.get(circleId).get(node).initialise(); - topologyLoops.get(circleId).get(node).initialiseStreams(); - } - - ((IterativeDataStream) head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream()); - } + private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class); + public static StreamExecutionEnvironment env; + public List<List<FlinkProcessingItem>> cycles = new ArrayList<>(); + public List<Integer> backEdges = new ArrayList<Integer>(); + + public FlinkTopology(String name, StreamExecutionEnvironment env) { + super(name); + this.env = env; + } + + public StreamExecutionEnvironment getEnvironment() { + return env; + } + + public void build() { + markCycles(); + for (EntranceProcessingItem src : getEntranceProcessingItems()) { + ((FlinkEntranceProcessingItem) src).initialise(); + } + initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class))); + } + + private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) { + if (flinkComponents.isEmpty()) return; + + for (FlinkProcessingItem comp : flinkComponents) { + if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCycle()) { + comp.initialise(); + comp.initialiseStreams(); + + }//if component is part of one or more cycle + else if (comp.isPartOfCycle() && !comp.isInitialised()) { + for (Integer cycle : comp.getCycleIds()) { + //check if cycle can be initialized + if (completenessCheck(cycle)) { + logger.debug("Cycle: " + cycle + " can be initialised"); + initializeCycle(cycle); + } else { + logger.debug("Cycle cannot be initialised"); + } + } + } + } + initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() { + @Override + public boolean apply(FlinkProcessingItem flinkComponent) { + return !flinkComponent.isInitialised(); + } + }))); + } + + /** + * Detects and marks all cycles and backedges needed to construct a Flink topology + */ + private void markCycles() { + List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)); + List<Integer>[] graph = new List[pis.size()]; + FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()]; + + + for (int i = 0; i < pis.size(); i++) { + graph[i] = new ArrayList<>(); + } + //construct the graph of the topology for the Processing Items (No entrance pi is included) + for (FlinkProcessingItem pi : pis) { + processingItems[pi.getComponentId()] = pi; + for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) { + if (is.f2 != -1) graph[is.f2].add(pi.getComponentId()); + } + } + for (int g = 0; g < graph.length; g++) + logger.debug(graph[g].toString()); + + CycleDetection detCycles = new CycleDetection(); + List<List<Integer>> graphCycles = detCycles.getCycles(graph); + + //update PIs, regarding being part of a cycle. + for (List<Integer> c : graphCycles) { + List<FlinkProcessingItem> cycle = new ArrayList<>(); + for (Integer it : c) { + cycle.add(processingItems[it]); + processingItems[it].addPItoCycle(cycles.size()); + } + cycles.add(cycle); + backEdges.add(cycle.get(0).getComponentId()); + } + logger.debug("Cycles detected in the topology: " + graphCycles); + } + + + private boolean completenessCheck(int cycleId) { + + List<Integer> cycleIDs = new ArrayList<>(); + + for (FlinkProcessingItem pi : cycles.get(cycleId)) { + cycleIDs.add(pi.getComponentId()); + } + //check that all incoming to the cycle streams are initialised + for (FlinkProcessingItem procItem : cycles.get(cycleId)) { + for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) { + //if a inputStream is not initialized AND source of inputStream is not in the cycle or a tail of other cycle + if ((!inputStream.f0.isInitialised()) && (!cycleIDs.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2))) + return false; + } + } + return true; + } + + private void initializeCycle(int cycleID) { + //get the head and tail of cycle + FlinkProcessingItem tail = cycles.get(cycleID).get(0); + FlinkProcessingItem head = cycles.get(cycleID).get(cycles.get(cycleID).size() - 1); + + //initialise source stream of the iteration, so as to use it for the iteration starting point + if (!head.isInitialised()) { + head.setOnIteration(true); + head.initialise(); + head.initialiseStreams(); + } + + //initialise all nodes after head + for (int node = cycles.get(cycleID).size() - 2; node >= 0; node--) { + FlinkProcessingItem processingItem = cycles.get(cycleID).get(node); + processingItem.initialise(); + processingItem.initialiseStreams(); + } + + SingleOutputStreamOperator backedge = (SingleOutputStreamOperator) head.getInputStreamBySourceID(tail.getComponentId()).getOutStream(); + backedge.setParallelism(head.getParallelism()); + ((IterativeStream) head.getDataStream()).closeWith(backedge); + } }
