Repository: incubator-samoa Updated Branches: refs/heads/master 1b3529983 -> 64ef7a921
SAMOA-16: Add an adapter for Apache Flink-Streaming (senorcarbone) Fix #11 Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/64ef7a92 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/64ef7a92 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/64ef7a92 Branch: refs/heads/master Commit: 64ef7a921833b020ea9e556579f0d933e07b0a63 Parents: 1b35299 Author: Gianmarco De Francisci Morales <[email protected]> Authored: Tue May 26 11:30:35 2015 +0300 Committer: Gianmarco De Francisci Morales <[email protected]> Committed: Tue May 26 11:30:35 2015 +0300 ---------------------------------------------------------------------- .gitignore | 3 +- bin/samoa | 62 ++++- bin/samoa-flink.properties | 35 +++ pom.xml | 12 + samoa-flink/pom.xml | 134 ++++++++++ .../java/com/yahoo/labs/flink/FlinkDoTask.java | 87 +++++++ .../labs/flink/helpers/CircleDetection.java | 99 ++++++++ .../com/yahoo/labs/flink/helpers/Utils.java | 69 ++++++ .../flink/topology/impl/FlinkComponent.java | 68 +++++ .../topology/impl/FlinkComponentFactory.java | 66 +++++ .../impl/FlinkEntranceProcessingItem.java | 101 ++++++++ .../topology/impl/FlinkProcessingItem.java | 248 +++++++++++++++++++ .../labs/flink/topology/impl/FlinkStream.java | 94 +++++++ .../labs/flink/topology/impl/FlinkTopology.java | 185 ++++++++++++++ .../labs/flink/topology/impl/SamoaType.java | 42 ++++ 15 files changed, 1303 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 3cf0208..294c718 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ target/ #intellij .idea/ -.iml +*.iml +*.iws http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/bin/samoa ---------------------------------------------------------------------- diff --git a/bin/samoa b/bin/samoa index b34f65b..0ace74b 100755 --- a/bin/samoa +++ b/bin/samoa @@ -4,7 +4,7 @@ # #%L # SAMOA # %% -# Copyright (C) 2013 Yahoo! Inc. +# Copyright (C) 2014 - 2015 Apache Software Foundation # %% # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -281,6 +281,66 @@ elif [ $PLATFORM = 'SAMZA' ]; then --kryo_register=$BASE_DIR/$KRYO_REGISTER_FILE --pi_per_container=$PI_PER_CONTAINER \ --samoa_hdfs_dir=$HDFS_SAMOA_HOME +elif [ $PLATFORM = 'FLINK' ]; then + + echo "Deploying to $PLATFORM" + if [ -z $FLINK_HOME ];then + echo "FLINK_HOME is not set!" + echo "Please set FLINK_HOME to point to your Flink installation" + exit -1 + fi + + if [ ! -f $2 ];then + echo "$2 does not exist!" + echo "Please use a valid jar file for Flink execution" + exit -1 + fi + + FLINK_EXEC="$FLINK_HOME/bin/flink" + + SAMOA_FLINK_PROPERTIES="samoa-flink.properties" + MODE_OPTION="samoa.flink.mode" +# NUM_WORKER_OPTION="samoa.flink.numWorker" + + VALUE="" + getvalue() + { + VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_FLINK_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'` + } + +# getvalue "$NUM_WORKER_OPTION" +# NUM_WORKER="$VALUE" + + getvalue "$MODE_OPTION" + MODE_ARG="$VALUE" + + COMPLETE_ARG="" + COUNTER=0 + for var in "$@" + do + COUNTER=`expr $COUNTER + 1` + if [ $COUNTER -gt 2 ];then + COMPLETE_ARG="$COMPLETE_ARG $var" + fi + done + + DEPLOYABLE=$JAR_PATH + echo "$DEPLOYABLE" + if [ "$MODE_ARG" = "cluster" ]; then + FLINK_MASTER_OPTION="samoa.flink.flinkMaster" + PORT_OPTION="samoa.flink.port" + + getvalue "$FLINK_MASTER_OPTION" + FLINK_MASTER_OPTION="$VALUE" + + getvalue "$PORT_OPTION" + PORT_OPTION="$VALUE" + $FLINK_EXEC run -m $FLINK_MASTER_OPTION:$PORT_OPTION $DEPLOYABLE $COMPLETE_ARG + + elif [ "$MODE_ARG" = "local" ]; then + $FLINK_EXEC run $DEPLOYABLE $COMPLETE_ARG + fi + elif [ $PLATFORM = 'THREADS' ]; then echo "Deploying to LOCAL with MULTITHREADING." http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/bin/samoa-flink.properties ---------------------------------------------------------------------- diff --git a/bin/samoa-flink.properties b/bin/samoa-flink.properties new file mode 100644 index 0000000..b9f56c0 --- /dev/null +++ b/bin/samoa-flink.properties @@ -0,0 +1,35 @@ +### +# #%L +# SAMOA +# %% +# Copyright (C) 2014 - 2015 Apache Software Foundation +# %% +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# #L% +### + +# SAMOA Flink properties file +# This file contains specific configurations for SAMOA deployment in Flink platform + +# samoa.flink.mode corresponds to the execution mode of a Task in Flink +# possible values: +# 1. local - to run the task in local StreamingEnvironment +# 2. cluster - to run the task in the specified cluster +samoa.flink.mode=cluster + +#in case samoa.flink.mode equals "cluster", then the user has to set up also the following parameters: +# @samoa.flink.flinkMaster: the IP address of the cluster +# @samoa.flink.port : the port +samoa.flink.flinkMaster=127.0.0.1 +samoa.flink.port=6123 + http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 622d952..819a13c 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,15 @@ </modules> </profile> <profile> + <id>flink</id> + <modules> + <module>samoa-instances</module> + <module>samoa-api</module> + <module>samoa-flink</module> + <module>samoa-test</module> + </modules> + </profile> + <profile> <id>samza</id> <modules> <module>samoa-instances</module> @@ -104,6 +113,7 @@ <module>samoa-local</module> <module>samoa-threads</module> <module>samoa-storm</module> + <module>samoa-flink</module> <module>samoa-s4</module> <module>samoa-samza</module> <module>samoa-test</module> @@ -130,6 +140,7 @@ <miniball.version>1.0.3</miniball.version> <s4.version>0.6.0-incubating</s4.version> <samza.version>0.7.0</samza.version> + <flink.version>0.9.0-milestone-1</flink.version> <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version> <slf4j-simple.version>1.7.5</slf4j-simple.version> <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version> @@ -211,6 +222,7 @@ <root>samoa-local</root> <root>samoa-storm</root> <root>samoa-s4</root> + <root>samoa-flink</root> <root>samoa-samza</root> <root>bin</root> </roots> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml new file mode 100644 index 0000000..f00fe3c --- /dev/null +++ b/samoa-flink/pom.xml @@ -0,0 +1,134 @@ +<!-- + #%L + SAMOA + %% + Copyright (C) 2013 Yahoo! Inc. + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + #L% + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <kryo.version>2.24.0</kryo.version> + </properties> + <repositories> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + <name>samoa-flink</name> + <description>Flink engine for SAMOA</description> + + <artifactId>samoa-flink</artifactId> + <parent> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + + + + <dependencies> + <dependency> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa-api</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j-log4j12.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-core</artifactId> + <version>${flink.version}</version> + <!--<scope>provided</scope>--> + </dependency> + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>${kryo.version}</version> + </dependency> + </dependencies> + + + <build> + <plugins> + <!-- Flink assembly --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>${maven-assembly-plugin.version}</version> + <configuration> + <finalName>SAMOA-Flink-${project.version}</finalName> + <appendAssemblyId>false</appendAssemblyId> + <attach>false</attach> + <outputDirectory>../target</outputDirectory> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <archive> + <manifestEntries> + <!--<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>--> + <Bundle-Description>${project.description}</Bundle-Description> + <Implementation-Version>${project.version}</Implementation-Version> + <Implementation-Vendor>Yahoo Labs</Implementation-Vendor> + <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id> + </manifestEntries> + <manifest> + <addClasspath>true</addClasspath> + <mainClass>com.yahoo.labs.flink.FlinkDoTask</mainClass> + </manifest> + </archive> + </configuration> + <executions> + <execution> + <id>make-assembly</id> <!-- this is used for inheritance merges --> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>${maven-surefire-plugin.version}</version> + <configuration> + <argLine>-Xmx1G</argLine> + <redirectTestOutputToFile>false</redirectTestOutputToFile> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java new file mode 100644 index 0000000..6069de9 --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java @@ -0,0 +1,87 @@ +package com.yahoo.labs.flink; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.ClassOption; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection; +import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; +import com.yahoo.labs.flink.topology.impl.FlinkComponentFactory; +import com.yahoo.labs.flink.topology.impl.FlinkProcessingItem; +import com.yahoo.labs.flink.topology.impl.FlinkStream; +import com.yahoo.labs.flink.topology.impl.FlinkTopology; +import com.yahoo.labs.samoa.tasks.Task; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + + +/** + * Main class to run a SAMOA on Apache Flink + */ +public class FlinkDoTask { + + private static final Logger logger = LoggerFactory.getLogger(FlinkDoTask.class); + + + public static void main(String[] args) throws Exception { + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + args = tmpArgs.toArray(new String[0]); + + // Init Task + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task; + try { + task = ClassOption.cliStringToObject(cliString.toString(), Task.class, null); + logger.debug("Successfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Failed to initialize the task: ", e); + System.out.println("Failed to initialize the task: " + e); + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + task.setFactory(new FlinkComponentFactory(env)); + task.init(); + + logger.debug("Building Flink topology..."); + ((FlinkTopology) task.getTopology()).build(); + + logger.debug("Submitting the job..."); + env.execute(); + + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java new file mode 100644 index 0000000..a832ee9 --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java @@ -0,0 +1,99 @@ +package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +/** + * This class contains all logic needed in order to mark circles in job graphs explicitly such as + * in the case of Apache Flink. A circle is defined as a list of node ids ordered in topological + * (DFS) order. + * + */ +public class CircleDetection { + private int[] index; + private int[] lowLink; + private int counter; + private Stack<Integer> stack; + private List<List<Integer>> scc; + List<Integer>[] graph; + + + public CircleDetection() { + stack = new Stack<Integer>(); + scc = new ArrayList<>(); + } + + public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) { + graph = adjacencyList; + index = new int[adjacencyList.length]; + lowLink = new int[adjacencyList.length]; + counter = 0; + + //initialize index and lowLink as "undefined"(=-1) + for (int j = 0; j < graph.length; j++) { + index[j] = -1; + lowLink[j] = -1; + } + for (int v = 0; v < graph.length; v++) { + if (index[v] == -1) { //undefined. + findSCC(v); + } + } + return scc; + } + + private void findSCC(int node) { + index[node] = counter; + lowLink[node] = counter; + counter++; + stack.push(node); + + for (int neighbor : graph[node]) { + if (index[neighbor] == -1) { + findSCC(neighbor); + lowLink[node] = Math.min(lowLink[node], lowLink[neighbor]); + } else if (stack.contains(neighbor)) { //if neighbor has been already visited + lowLink[node] = Math.min(lowLink[node], index[neighbor]); + List<Integer> sccComponent = new ArrayList<Integer>(); + int w; + do { + w = stack.pop(); + sccComponent.add(w); + } while (neighbor != w); + //add neighbor again, just in case it is a member of another circle + stack.add(neighbor); + scc.add(sccComponent); + } + + } + if (lowLink[node] == index[node]) { + int w; + do { + w = stack.pop(); + } while (node != w); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java new file mode 100644 index 0000000..fe1b960 --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java @@ -0,0 +1,69 @@ +package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + +import com.yahoo.labs.flink.topology.impl.SamoaType; +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.utils.PartitioningScheme; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.datastream.DataStream; + +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; + +public class Utils { + + public static TypeInformation<SamoaType> tempTypeInfo = new TupleTypeInfo(SamoaType.class, STRING_TYPE_INFO, TypeExtractor.getForClass(ContentEvent.class), STRING_TYPE_INFO); + + public static DataStream subscribe(DataStream<SamoaType> stream, PartitioningScheme partitioning) { + switch (partitioning) { + case BROADCAST: + return stream.broadcast(); + case GROUP_BY_KEY: + return stream.groupBy(new KeySelector<SamoaType, String>() { + @Override + public String getKey(SamoaType samoaType) throws Exception { + return samoaType.f0; + } + }); + case SHUFFLE: + default: + return stream.shuffle(); + } + } + + public static FilterFunction<SamoaType> getFilter(final String streamID) { + return new FilterFunction<SamoaType>() { + @Override + public boolean filter(SamoaType o) throws Exception { + return o.f2.equals(streamID); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java new file mode 100644 index 0000000..70a7838 --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java @@ -0,0 +1,68 @@ +package com.yahoo.labs.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + +import org.apache.flink.streaming.api.datastream.DataStream; + +/** + * Common interface of FlinkEntranceProcessingItem and FlinkProcessingItem + */ +public interface FlinkComponent { + + /** + * An initiation of the node. It should create the right invokables and apply the appropriate + * stream transformations + */ + public void initialise(); + + /** + * This check is needed in order to determine whether all requirements for a Flink Component + * (DataStream) are satisfied in order to initialise it. This is necessary in this integration + * since Flink Streaming applies eager datastream generation based on transformations. + * + * @return + */ + public boolean canBeInitialised(); + + /** + * + * @return + */ + public boolean isInitialised(); + + /** + * The wrapped Flink DataStream generated by this Flink component. Mind that the component + * should first be initialised in order to have a generated DataStream + * + * @return + */ + public DataStream<SamoaType> getOutStream(); + + /** + * A unique component id + * + * @return + */ + public int getComponentId(); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java new file mode 100644 index 0000000..fca0c1a --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java @@ -0,0 +1,66 @@ +package com.yahoo.labs.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.*; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * An implementation of SAMOA's ComponentFactory for Apache Flink + */ +public class FlinkComponentFactory implements ComponentFactory { + + private StreamExecutionEnvironment env; + + public FlinkComponentFactory(StreamExecutionEnvironment env) { + this.env = env; + } + + @Override + public ProcessingItem createPi(Processor processor) { + return new FlinkProcessingItem(env, processor); + } + + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new FlinkProcessingItem(env, processor, parallelism); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + return new FlinkEntranceProcessingItem(env, entranceProcessor); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + if (sourcePi instanceof FlinkProcessingItem) + return ((FlinkProcessingItem) sourcePi).createStream(); + else return new FlinkStream((FlinkComponent) sourcePi); + } + + @Override + public Topology createTopology(String topologyName) { + return new FlinkTopology(topologyName, env); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java new file mode 100644 index 0000000..5dca509 --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java @@ -0,0 +1,101 @@ +package com.yahoo.labs.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.source.RichSourceFunction; +import org.apache.flink.util.Collector; + +import java.io.Serializable; + +public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem + implements FlinkComponent, Serializable { + + private transient StreamExecutionEnvironment env; + private transient DataStream outStream; + + + public FlinkEntranceProcessingItem(StreamExecutionEnvironment env, EntranceProcessor proc) { + super(proc); + this.env = env; + } + + @Override + public void initialise() { + final EntranceProcessor proc = getProcessor(); + final String streamId = getOutputStream().getStreamId(); + final int compID = getComponentId(); + + + outStream = env.addSource(new RichSourceFunction<SamoaType>() { + volatile boolean canceled; + EntranceProcessor entrProc = proc; + String id = streamId; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + entrProc.onCreate(compID); + } + + @Override + public void run(Collector<SamoaType> collector) throws Exception { + while (!canceled && entrProc.hasNext()) { + collector.collect(SamoaType.of(entrProc.nextEvent(), id)); + } + } + + @Override + public void cancel() { + canceled = true; + } + },Utils.tempTypeInfo); + + ((FlinkStream) getOutputStream()).initialise(); + } + + + @Override + public boolean canBeInitialised() { + return true; + } + + @Override + public boolean isInitialised() { + return outStream != null; + } + + @Override + public int getComponentId() { + return -1; // dummy number shows that it comes from an Entrance PI + } + + @Override + public DataStream getOutStream() { + return outStream; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java new file mode 100644 index 0000000..f92182e --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java @@ -0,0 +1,248 @@ +package com.yahoo.labs.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import com.google.common.collect.Lists; +import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.topology.ProcessingItem; +import com.yahoo.labs.samoa.topology.Stream; +import com.yahoo.labs.samoa.utils.PartitioningScheme; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + + +public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> implements ProcessingItem, FlinkComponent, Serializable { + + private static final Logger logger = LoggerFactory.getLogger(FlinkProcessingItem.class); + public static final int MAX_WAIT_TIME_MILLIS = 10000; + + private final Processor processor; + private final transient StreamExecutionEnvironment env; + private final SamoaDelegateFunction fun; + private transient DataStream<SamoaType> inStream; + private transient DataStream<SamoaType> outStream; + private transient List<FlinkStream> outputStreams = Lists.newArrayList(); + private transient List<Tuple3<FlinkStream, PartitioningScheme, Integer>> inputStreams = Lists.newArrayList(); + private int parallelism; + private static int numberOfPIs = 0; + private int piID; + private List<Integer> circleId; //check if we can refactor this + private boolean onIteration; + //private int circleId; //check if we can refactor this + + public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc) { + this(env, proc, 1); + } + + public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc, int parallelism) { + this(env, new SamoaDelegateFunction(proc), proc, parallelism); + } + + public FlinkProcessingItem(StreamExecutionEnvironment env, SamoaDelegateFunction fun, Processor proc, int parallelism) { + super(fun); + this.env = env; + this.fun = fun; + this.processor = proc; + this.parallelism = parallelism; + this.piID = numberOfPIs++; + this.circleId = new ArrayList<Integer>() { + }; // if size equals 0, then it is part of no circle + } + + public Stream createStream() { + FlinkStream generatedStream = new FlinkStream(this); + outputStreams.add(generatedStream); + return generatedStream; + } + + public void putToStream(ContentEvent data, Stream targetStream) { + collector.collect(SamoaType.of(data, targetStream.getStreamId())); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.processor.onCreate(getComponentId()); + } + + @Override + public void initialise() { + for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) { + if (inputStream.f0.isInitialised()) { //if input stream is initialised + try { + DataStream toBeMerged = Utils.subscribe(inputStream.f0.getOutStream(), inputStream.f1); + if (inStream == null) { + inStream = toBeMerged; + } else { + inStream = inStream.merge(toBeMerged); + } + } catch (RuntimeException e) { + e.printStackTrace(); + System.exit(1); + } + } + } + + if (onIteration) { + inStream = inStream.iterate(MAX_WAIT_TIME_MILLIS); + } + outStream = inStream.transform("samoaProcessor", Utils.tempTypeInfo, this).setParallelism(parallelism); + } + + public void initialiseStreams() { + for (FlinkStream stream : this.getOutputStreams()) { + stream.initialise(); + } + } + + @Override + public boolean canBeInitialised() { + for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) { + if (!inputStream.f0.isInitialised()) return false; + } + return true; + } + + @Override + public boolean isInitialised() { + return outStream != null; + } + + @Override + public Processor getProcessor() { + return processor; + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + SamoaType t = nextObject; + fun.processEvent(t.f1); + } + } + + @Override + public ProcessingItem connectInputShuffleStream(Stream inputStream) { + inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.SHUFFLE, ((FlinkStream) inputStream).getSourcePiId())); + return this; + } + + @Override + public ProcessingItem connectInputKeyStream(Stream inputStream) { + inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.GROUP_BY_KEY, ((FlinkStream) inputStream).getSourcePiId())); + return this; + } + + @Override + public ProcessingItem connectInputAllStream(Stream inputStream) { + inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.BROADCAST, ((FlinkStream) inputStream).getSourcePiId())); + return this; + } + + @Override + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public List<FlinkStream> getOutputStreams() { + return outputStreams; + } + + public DataStream<SamoaType> getOutStream() { + return this.outStream; + } + + public void setOutStream(DataStream outStream) { + this.outStream = outStream; + } + + @Override + public int getComponentId() { + return piID; + } + + public boolean isPartOfCircle() { + return this.circleId.size() > 0; + } + + public List<Integer> getCircleIds() { + return circleId; + } + + public void addPItoLoop(int piId) { + this.circleId.add(piId); + } + + public DataStream<SamoaType> getInStream() { + return inStream; + } + + public List<Tuple3<FlinkStream, PartitioningScheme, Integer>> getInputStreams() { + return inputStreams; + } + + public void setOnIteration(boolean onIteration) { + this.onIteration = onIteration; + } + + public boolean isOnIteration() { + return onIteration; + } + + static class SamoaDelegateFunction implements Function, Serializable { + private final Processor proc; + + SamoaDelegateFunction(Processor proc) { + this.proc = proc; + } + + public void processEvent(ContentEvent event) { + proc.process(event); + } + } + + public FlinkStream getInputStreamBySourceID(int sourceID) { + for (Tuple3<FlinkStream, PartitioningScheme, Integer> fstreams : inputStreams) { + if (fstreams.f2 == sourceID) { + return fstreams.f0; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java new file mode 100644 index 0000000..c5cb0ed --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java @@ -0,0 +1,94 @@ +package com.yahoo.labs.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + +import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.topology.AbstractStream; +import org.apache.flink.streaming.api.datastream.DataStream; + +import java.io.Serializable; + + +/** + * A stream for SAMOA based on Apache Flink's DataStream + */ +public class FlinkStream extends AbstractStream implements FlinkComponent, Serializable { + + private static int outputCounter = 0; + private FlinkComponent procItem; + private transient DataStream<SamoaType> dataStream; + private int sourcePiId; + private String flinkStreamId; + + public FlinkStream(FlinkComponent sourcePi) { + this.procItem = sourcePi; + this.sourcePiId = sourcePi.getComponentId(); + setStreamId("stream-" + Integer.toString(outputCounter)); + flinkStreamId = "stream-" + Integer.toString(outputCounter); + outputCounter++; + } + + @Override + public void initialise() { + if (procItem instanceof FlinkProcessingItem) { + dataStream = procItem.getOutStream().filter(Utils.getFilter(getStreamId())) + .setParallelism(((FlinkProcessingItem) procItem).getParallelism()); + } else + dataStream = procItem.getOutStream(); + } + + @Override + public boolean canBeInitialised() { + return procItem.isInitialised(); + } + + @Override + public boolean isInitialised() { + return dataStream != null; + } + + @Override + public DataStream getOutStream() { + return dataStream; + } + + @Override + public void put(ContentEvent event) { + ((FlinkProcessingItem) procItem).putToStream(event, this); + } + + @Override + public int getComponentId() { + return -1; //dummy number shows that it comes from a Stream + } + + public int getSourcePiId() { + return sourcePiId; + } + + @Override + public String getStreamId() { + return flinkStreamId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java new file mode 100644 index 0000000..f04d792 --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java @@ -0,0 +1,185 @@ +package com.yahoo.labs.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection; +import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; +import com.yahoo.labs.samoa.topology.AbstractTopology; +import com.yahoo.labs.samoa.topology.EntranceProcessingItem; +import com.yahoo.labs.samoa.utils.PartitioningScheme; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.IterativeDataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * A SAMOA topology on Apache Flink + * + * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated within custom operators. + * Streams are tagged and filtered in each operator's output so they can be routed to the right + * operator respectively. Building a Flink topology from a Samoa task involves invoking all these + * stream transformations and finally, marking and initiating loops in the graph. We have to do that + * since Flink only allows explicit loops in the topology started with 'iterate()' and closed with + * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the + * sources, mark loops and initialize them with explicit iterations. + * + */ +public class FlinkTopology extends AbstractTopology { + + private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class); + public static StreamExecutionEnvironment env; + public List<List<FlinkProcessingItem>> topologyLoops = new ArrayList<>(); + public List<Integer> backEdges = new ArrayList<Integer>(); + + public FlinkTopology(String name, StreamExecutionEnvironment env) { + super(name); + this.env = env; + } + + public StreamExecutionEnvironment getEnvironment() { + return env; + } + + public void build() { + markCircles(); + for (EntranceProcessingItem src : getEntranceProcessingItems()) { + ((FlinkEntranceProcessingItem) src).initialise(); + } + initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class))); + } + + private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) { + if (flinkComponents.isEmpty()) return; + + for (FlinkProcessingItem comp : flinkComponents) { + if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCircle()) { + comp.initialise(); + comp.initialiseStreams(); + + }//if component is part of one or more circle + else if (comp.isPartOfCircle() && !comp.isInitialised()) { + for (Integer circle : comp.getCircleIds()) { + //check if circle can be initialized + if (checkCircleReady(circle)) { + logger.debug("Circle: " + circle + " can be initialised"); + initialiseCircle(circle); + } else { + logger.debug("Circle cannot be initialised"); + } + } + } + + } + initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() { + @Override + public boolean apply(FlinkProcessingItem flinkComponent) { + return !flinkComponent.isInitialised(); + } + }))); + } + + private void markCircles(){ + List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)); + List<Integer>[] graph = new List[pis.size()]; + FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()]; + + + for (int i=0;i<pis.size();i++) { + graph[i] = new ArrayList<Integer>(); + } + //construct the graph of the topology for the Processing Items (No entrance pi is included) + for (FlinkProcessingItem pi: pis) { + processingItems[pi.getComponentId()] = pi; + for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) { + if (is.f2 != -1) graph[is.f2].add(pi.getComponentId()); + } + } + for (int g=0;g<graph.length;g++) + logger.debug(graph[g].toString()); + + CircleDetection detCircles = new CircleDetection(); + List<List<Integer>> circles = detCircles.getCircles(graph); + + //update PIs, regarding being part of a circle. + for (List<Integer> c : circles){ + List<FlinkProcessingItem> circle = new ArrayList<>(); + for (Integer it : c){ + circle.add(processingItems[it]); + processingItems[it].addPItoLoop(topologyLoops.size()); + } + topologyLoops.add(circle); + backEdges.add(circle.get(0).getComponentId()); + } + logger.debug("Circles detected in the topology: " + circles); + } + + + private boolean checkCircleReady(int circleId) { + + List<Integer> circleIds = new ArrayList<>(); + + for (FlinkProcessingItem pi : topologyLoops.get(circleId)) { + circleIds.add(pi.getComponentId()); + } + //check that all incoming to the circle streams are initialised + for (FlinkProcessingItem procItem : topologyLoops.get(circleId)) { + for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) { + //if a inputStream is not initialized AND source of inputStream is not in the circle or a tail of other circle + if ((!inputStream.f0.isInitialised()) && (!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2))) + return false; + } + } + return true; + } + + private void initialiseCircle(int circleId) { + //get the head and tail of circle + FlinkProcessingItem tail = topologyLoops.get(circleId).get(0); + FlinkProcessingItem head = topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1); + + //initialise source stream of the iteration, so as to use it for the iteration starting point + if (!head.isInitialised()) { + head.setOnIteration(true); + head.initialise(); + head.initialiseStreams(); + } + + //initialise all nodes after head + for (int node = topologyLoops.get(circleId).size() - 2; node >= 0; node--) { + topologyLoops.get(circleId).get(node).initialise(); + topologyLoops.get(circleId).get(node).initialiseStreams(); + } + + ((IterativeDataStream) head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream()); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/64ef7a92/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java new file mode 100644 index 0000000..16d050a --- /dev/null +++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java @@ -0,0 +1,42 @@ +package com.yahoo.labs.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + + +import com.yahoo.labs.samoa.core.ContentEvent; +import org.apache.flink.api.java.tuple.Tuple3; + +public class SamoaType extends Tuple3<String, ContentEvent, String> { + public SamoaType() { + super(); + } + + private SamoaType(String key, ContentEvent event, String streamId) { + super(key, event, streamId); + } + + public static SamoaType of(ContentEvent event, String streamId) { + String key = event.getKey() == null ? "none" : event.getKey(); + return new SamoaType(key, event, streamId); + } +} \ No newline at end of file
