http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/org/apache/samoa/streams/fs/HDFSFileStreamSourceTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/fs/HDFSFileStreamSourceTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/fs/HDFSFileStreamSourceTest.java new file mode 100644 index 0000000..eaba37d --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/fs/HDFSFileStreamSourceTest.java @@ -0,0 +1,307 @@ +package org.apache.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedWriter; +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Iterator; +import java.util.Set; +import java.util.HashSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.Builder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.samoa.streams.fs.HDFSFileStreamSource; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class HDFSFileStreamSourceTest { + + private static final String[] HOSTS = { "localhost" }; + private static final String BASE_DIR = "/minidfsTest"; + private static final int NUM_FILES_IN_DIR = 4; + private static final int NUM_NOISE_FILES_IN_DIR = 2; + + private HDFSFileStreamSource streamSource; + + private Configuration config; + private MiniDFSCluster hdfsCluster; + private String hdfsURI; + + @Before + public void setUp() throws Exception { + // Start MiniDFSCluster + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new Configuration()).hosts(HOSTS).numDataNodes(1) + .format(true); + hdfsCluster = builder.build(); + hdfsCluster.waitActive(); + hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort(); + + // Construct stream source + streamSource = new HDFSFileStreamSource(); + + // General config + config = new Configuration(); + config.set("fs.defaultFS", hdfsURI); + } + + @After + public void tearDown() throws Exception { + hdfsCluster.shutdown(); + } + + /* + * Init tests + */ + @Test + public void testInitWithSingleFileAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR, "txt", 1); + + // init with path to input file + streamSource.init(config, BASE_DIR + "/1.txt", "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); + String fn = streamSource.getFilePathAt(0); + assertTrue("Incorrect file in filePaths.", + fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + "1.txt")); + } + + @Test + public void testInitWithSingleFileAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR, "txt", 1); + + // init with path to input file + streamSource.init(config, BASE_DIR + "/1.txt", null); + + // assertions + assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); + String fn = streamSource.getFilePathAt(0); + assertTrue("Incorrect file in filePaths.", + fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + "1.txt")); + } + + @Test + public void testInitWithFolderAndExtension() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); + Set<String> filenames = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + String targetFn = BASE_DIR + "/" + Integer.toString(i) + ".txt"; + filenames.add(targetFn); + filenames.add(hdfsURI + targetFn); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); + } + } + + @Test + public void testInitWithFolderAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, null); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); + Set<String> filenames = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + String targetFn = BASE_DIR + "/" + Integer.toString(i); + filenames.add(targetFn); + filenames.add(hdfsURI + targetFn); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); + } + } + + /* + * getNextInputStream tests + */ + @Test + public void testGetNextInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // call getNextInputStream & assertions + Set<String> contents = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + contents.add(Integer.toString(i)); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.", inStream); + + BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:" + i + ioe.getMessage()); + } + assertTrue("File content is incorrect.", contents.contains(inputRead)); + Iterator<String> it = contents.iterator(); + while (it.hasNext()) { + if (it.next().equals(inputRead)) { + it.remove(); + break; + } + } + } + + // assert that another call to getNextInputStream will return null + assertNull("Call getNextInputStream after the last file did not return null.", streamSource.getNextInputStream()); + } + + /* + * getCurrentInputStream tests + */ + public void testGetCurrentInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // call getNextInputStream, getCurrentInputStream & assertions + for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list + InputStream inStream1 = streamSource.getNextInputStream(); + InputStream inStream2 = streamSource.getCurrentInputStream(); + assertSame("Incorrect current input stream.", inStream1, inStream2); + } + } + + /* + * reset tests + */ + public void testReset() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // Get the first input string + InputStream firstInStream = streamSource.getNextInputStream(); + String firstInput = null; + assertNotNull("Unexpected end of input stream list.", firstInStream); + + BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); + try { + firstInput = rd1.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + + // call getNextInputStream a few times + streamSource.getNextInputStream(); + + // call reset, call next, assert that output is 1 (the first file) + try { + streamSource.reset(); + } catch (IOException ioe) { + fail("Fail resetting stream source." + ioe.getMessage()); + } + + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.", inStream); + + BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd2.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + assertEquals("File content is incorrect.", firstInput, inputRead); + } + + private void writeSimpleFiles(String path, String ext, int numOfFiles) { + // get filesystem + FileSystem dfs; + try { + dfs = hdfsCluster.getFileSystem(); + } catch (IOException ioe) { + fail("Could not access MiniDFSCluster" + ioe.getMessage()); + return; + } + + // create basedir + Path basedir = new Path(path); + try { + dfs.mkdirs(basedir); + } catch (IOException ioe) { + fail("Could not create DIR:" + path + "\n" + ioe.getMessage()); + return; + } + + // write files + for (int i = 1; i <= numOfFiles; i++) { + String fn = null; + if (ext != null) { + fn = Integer.toString(i) + "." + ext; + } else { + fn = Integer.toString(i); + } + + try { + OutputStream fin = dfs.create(new Path(path, fn)); + BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(fin)); + wr.write(Integer.toString(i)); + wr.close(); + fin.close(); + } catch (IOException ioe) { + fail("Fail writing to input file: " + fn + " in directory: " + path + ioe.getMessage()); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/org/apache/samoa/streams/fs/LocalFileStreamSourceTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/fs/LocalFileStreamSourceTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/fs/LocalFileStreamSourceTest.java new file mode 100644 index 0000000..374ebd1 --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/streams/fs/LocalFileStreamSourceTest.java @@ -0,0 +1,277 @@ +package org.apache.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedWriter; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.lang.SecurityException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.apache.commons.io.FileUtils; +import org.apache.samoa.streams.fs.LocalFileStreamSource; + +public class LocalFileStreamSourceTest { + private static final String BASE_DIR = "localfsTest"; + private static final int NUM_FILES_IN_DIR = 4; + private static final int NUM_NOISE_FILES_IN_DIR = 2; + + private LocalFileStreamSource streamSource; + + @Before + public void setUp() throws Exception { + streamSource = new LocalFileStreamSource(); + + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(BASE_DIR)); + } + + @Test + public void testInitWithSingleFileAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR, "txt", 1); + + // init with path to input file + File inFile = new File(BASE_DIR, "1.txt"); + String inFilePath = inFile.getAbsolutePath(); + streamSource.init(inFilePath, "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); + String fn = streamSource.getFilePathAt(0); + assertEquals("Incorrect file in filePaths.", inFilePath, fn); + } + + @Test + public void testInitWithSingleFileAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR, "txt", 1); + + // init with path to input file + File inFile = new File(BASE_DIR, "1.txt"); + String inFilePath = inFile.getAbsolutePath(); + streamSource.init(inFilePath, null); + + // assertions + assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); + String fn = streamSource.getFilePathAt(0); + assertEquals("Incorrect file in filePaths.", inFilePath, fn); + } + + @Test + public void testInitWithFolderAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR); + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + File inDir = new File(BASE_DIR); + String inDirPath = inDir.getAbsolutePath(); + streamSource.init(inDirPath, "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); + Set<String> filenames = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + String expectedFn = (new File(inDirPath, Integer.toString(i) + ".txt")).getAbsolutePath(); + filenames.add(expectedFn); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); + } + } + + @Test + public void testInitWithFolderAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR); + + // init with path to input dir + File inDir = new File(BASE_DIR); + String inDirPath = inDir.getAbsolutePath(); + streamSource.init(inDirPath, null); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); + Set<String> filenames = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + String expectedFn = (new File(inDirPath, Integer.toString(i))).getAbsolutePath(); + filenames.add(expectedFn); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); + } + } + + /* + * getNextInputStream tests + */ + @Test + public void testGetNextInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // call getNextInputStream & assertions + Set<String> contents = new HashSet<String>(); + for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { + contents.add(Integer.toString(i)); + } + for (int i = 0; i < NUM_FILES_IN_DIR; i++) { + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.", inStream); + + BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:" + i + ioe.getMessage()); + } + assertTrue("File content is incorrect.", contents.contains(inputRead)); + Iterator<String> it = contents.iterator(); + while (it.hasNext()) { + if (it.next().equals(inputRead)) { + it.remove(); + break; + } + } + } + + // assert that another call to getNextInputStream will return null + assertNull("Call getNextInputStream after the last file did not return null.", streamSource.getNextInputStream()); + } + + /* + * getCurrentInputStream tests + */ + public void testGetCurrentInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // call getNextInputStream, getCurrentInputStream & assertions + for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list + InputStream inStream1 = streamSource.getNextInputStream(); + InputStream inStream2 = streamSource.getCurrentInputStream(); + assertSame("Incorrect current input stream.", inStream1, inStream2); + } + } + + /* + * reset tests + */ + public void testReset() { + // write input files & noise files + writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // Get the first input string + InputStream firstInStream = streamSource.getNextInputStream(); + String firstInput = null; + assertNotNull("Unexpected end of input stream list.", firstInStream); + + BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); + try { + firstInput = rd1.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + + // call getNextInputStream a few times + streamSource.getNextInputStream(); + + // call reset, call next, assert that output is 1 (the first file) + try { + streamSource.reset(); + } catch (IOException ioe) { + fail("Fail resetting stream source." + ioe.getMessage()); + } + + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.", inStream); + + BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd2.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + assertEquals("File content is incorrect.", firstInput, inputRead); + } + + private void writeSimpleFiles(String path, String ext, int numOfFiles) { + // Create folder + File folder = new File(path); + if (!folder.exists()) { + try { + folder.mkdir(); + } catch (SecurityException se) { + fail("Failed creating directory:" + path + se); + } + } + + // Write files + for (int i = 1; i <= numOfFiles; i++) { + String fn = null; + if (ext != null) { + fn = Integer.toString(i) + "." + ext; + } else { + fn = Integer.toString(i); + } + + try { + FileWriter fwr = new FileWriter(new File(path, fn)); + fwr.write(Integer.toString(i)); + fwr.close(); + } catch (IOException ioe) { + fail("Fail writing to input file: " + fn + " in directory: " + path + ioe.getMessage()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml index f00fe3c..f56ac70 100644 --- a/samoa-flink/pom.xml +++ b/samoa-flink/pom.xml @@ -42,7 +42,7 @@ <artifactId>samoa-flink</artifactId> <parent> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa</artifactId> <version>0.3.0-SNAPSHOT</version> </parent> @@ -51,7 +51,7 @@ <dependencies> <dependency> - <groupId>com.yahoo.labs.samoa</groupId> + <groupId>org.apache.samoa</groupId> <artifactId>samoa-api</artifactId> <version>${project.version}</version> <exclusions> @@ -105,7 +105,7 @@ </manifestEntries> <manifest> <addClasspath>true</addClasspath> - <mainClass>com.yahoo.labs.flink.FlinkDoTask</mainClass> + <mainClass>org.apache.samoa.flink.FlinkDoTask</mainClass> </manifest> </archive> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java deleted file mode 100644 index 6069de9..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.yahoo.labs.flink; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.github.javacliparser.ClassOption; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection; -import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; -import com.yahoo.labs.flink.topology.impl.FlinkComponentFactory; -import com.yahoo.labs.flink.topology.impl.FlinkProcessingItem; -import com.yahoo.labs.flink.topology.impl.FlinkStream; -import com.yahoo.labs.flink.topology.impl.FlinkTopology; -import com.yahoo.labs.samoa.tasks.Task; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - - -/** - * Main class to run a SAMOA on Apache Flink - */ -public class FlinkDoTask { - - private static final Logger logger = LoggerFactory.getLogger(FlinkDoTask.class); - - - public static void main(String[] args) throws Exception { - List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - - args = tmpArgs.toArray(new String[0]); - - // Init Task - StringBuilder cliString = new StringBuilder(); - for (int i = 0; i < args.length; i++) { - cliString.append(" ").append(args[i]); - } - logger.debug("Command line string = {}", cliString.toString()); - System.out.println("Command line string = " + cliString.toString()); - - Task task; - try { - task = ClassOption.cliStringToObject(cliString.toString(), Task.class, null); - logger.debug("Successfully instantiating {}", task.getClass().getCanonicalName()); - } catch (Exception e) { - logger.error("Failed to initialize the task: ", e); - System.out.println("Failed to initialize the task: " + e); - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - task.setFactory(new FlinkComponentFactory(env)); - task.init(); - - logger.debug("Building Flink topology..."); - ((FlinkTopology) task.getTopology()).build(); - - logger.debug("Submitting the job..."); - env.execute(); - - } - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java deleted file mode 100644 index a832ee9..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - -import java.util.ArrayList; -import java.util.List; -import java.util.Stack; - -/** - * This class contains all logic needed in order to mark circles in job graphs explicitly such as - * in the case of Apache Flink. A circle is defined as a list of node ids ordered in topological - * (DFS) order. - * - */ -public class CircleDetection { - private int[] index; - private int[] lowLink; - private int counter; - private Stack<Integer> stack; - private List<List<Integer>> scc; - List<Integer>[] graph; - - - public CircleDetection() { - stack = new Stack<Integer>(); - scc = new ArrayList<>(); - } - - public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) { - graph = adjacencyList; - index = new int[adjacencyList.length]; - lowLink = new int[adjacencyList.length]; - counter = 0; - - //initialize index and lowLink as "undefined"(=-1) - for (int j = 0; j < graph.length; j++) { - index[j] = -1; - lowLink[j] = -1; - } - for (int v = 0; v < graph.length; v++) { - if (index[v] == -1) { //undefined. - findSCC(v); - } - } - return scc; - } - - private void findSCC(int node) { - index[node] = counter; - lowLink[node] = counter; - counter++; - stack.push(node); - - for (int neighbor : graph[node]) { - if (index[neighbor] == -1) { - findSCC(neighbor); - lowLink[node] = Math.min(lowLink[node], lowLink[neighbor]); - } else if (stack.contains(neighbor)) { //if neighbor has been already visited - lowLink[node] = Math.min(lowLink[node], index[neighbor]); - List<Integer> sccComponent = new ArrayList<Integer>(); - int w; - do { - w = stack.pop(); - sccComponent.add(w); - } while (neighbor != w); - //add neighbor again, just in case it is a member of another circle - stack.add(neighbor); - scc.add(sccComponent); - } - - } - if (lowLink[node] == index[node]) { - int w; - do { - w = stack.pop(); - } while (node != w); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java deleted file mode 100644 index fe1b960..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java +++ /dev/null @@ -1,69 +0,0 @@ -package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - - -import com.yahoo.labs.flink.topology.impl.SamoaType; -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.util.List; - -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; - -public class Utils { - - public static TypeInformation<SamoaType> tempTypeInfo = new TupleTypeInfo(SamoaType.class, STRING_TYPE_INFO, TypeExtractor.getForClass(ContentEvent.class), STRING_TYPE_INFO); - - public static DataStream subscribe(DataStream<SamoaType> stream, PartitioningScheme partitioning) { - switch (partitioning) { - case BROADCAST: - return stream.broadcast(); - case GROUP_BY_KEY: - return stream.groupBy(new KeySelector<SamoaType, String>() { - @Override - public String getKey(SamoaType samoaType) throws Exception { - return samoaType.f0; - } - }); - case SHUFFLE: - default: - return stream.shuffle(); - } - } - - public static FilterFunction<SamoaType> getFilter(final String streamID) { - return new FilterFunction<SamoaType>() { - @Override - public boolean filter(SamoaType o) throws Exception { - return o.f2.equals(streamID); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java deleted file mode 100644 index 70a7838..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.yahoo.labs.flink.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - - -import org.apache.flink.streaming.api.datastream.DataStream; - -/** - * Common interface of FlinkEntranceProcessingItem and FlinkProcessingItem - */ -public interface FlinkComponent { - - /** - * An initiation of the node. It should create the right invokables and apply the appropriate - * stream transformations - */ - public void initialise(); - - /** - * This check is needed in order to determine whether all requirements for a Flink Component - * (DataStream) are satisfied in order to initialise it. This is necessary in this integration - * since Flink Streaming applies eager datastream generation based on transformations. - * - * @return - */ - public boolean canBeInitialised(); - - /** - * - * @return - */ - public boolean isInitialised(); - - /** - * The wrapped Flink DataStream generated by this Flink component. Mind that the component - * should first be initialised in order to have a generated DataStream - * - * @return - */ - public DataStream<SamoaType> getOutStream(); - - /** - * A unique component id - * - * @return - */ - public int getComponentId(); - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java deleted file mode 100644 index fca0c1a..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.yahoo.labs.flink.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.*; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * An implementation of SAMOA's ComponentFactory for Apache Flink - */ -public class FlinkComponentFactory implements ComponentFactory { - - private StreamExecutionEnvironment env; - - public FlinkComponentFactory(StreamExecutionEnvironment env) { - this.env = env; - } - - @Override - public ProcessingItem createPi(Processor processor) { - return new FlinkProcessingItem(env, processor); - } - - @Override - public ProcessingItem createPi(Processor processor, int parallelism) { - return new FlinkProcessingItem(env, processor, parallelism); - } - - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { - return new FlinkEntranceProcessingItem(env, entranceProcessor); - } - - @Override - public Stream createStream(IProcessingItem sourcePi) { - if (sourcePi instanceof FlinkProcessingItem) - return ((FlinkProcessingItem) sourcePi).createStream(); - else return new FlinkStream((FlinkComponent) sourcePi); - } - - @Override - public Topology createTopology(String topologyName) { - return new FlinkTopology(topologyName, env); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java deleted file mode 100644 index 5dca509..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.yahoo.labs.flink.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - -import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.RichSourceFunction; -import org.apache.flink.util.Collector; - -import java.io.Serializable; - -public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem - implements FlinkComponent, Serializable { - - private transient StreamExecutionEnvironment env; - private transient DataStream outStream; - - - public FlinkEntranceProcessingItem(StreamExecutionEnvironment env, EntranceProcessor proc) { - super(proc); - this.env = env; - } - - @Override - public void initialise() { - final EntranceProcessor proc = getProcessor(); - final String streamId = getOutputStream().getStreamId(); - final int compID = getComponentId(); - - - outStream = env.addSource(new RichSourceFunction<SamoaType>() { - volatile boolean canceled; - EntranceProcessor entrProc = proc; - String id = streamId; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - entrProc.onCreate(compID); - } - - @Override - public void run(Collector<SamoaType> collector) throws Exception { - while (!canceled && entrProc.hasNext()) { - collector.collect(SamoaType.of(entrProc.nextEvent(), id)); - } - } - - @Override - public void cancel() { - canceled = true; - } - },Utils.tempTypeInfo); - - ((FlinkStream) getOutputStream()).initialise(); - } - - - @Override - public boolean canBeInitialised() { - return true; - } - - @Override - public boolean isInitialised() { - return outStream != null; - } - - @Override - public int getComponentId() { - return -1; // dummy number shows that it comes from an Entrance PI - } - - @Override - public DataStream getOutStream() { - return outStream; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java deleted file mode 100644 index f92182e..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java +++ /dev/null @@ -1,248 +0,0 @@ -package com.yahoo.labs.flink.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - -import com.google.common.collect.Lists; -import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - - -public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> implements ProcessingItem, FlinkComponent, Serializable { - - private static final Logger logger = LoggerFactory.getLogger(FlinkProcessingItem.class); - public static final int MAX_WAIT_TIME_MILLIS = 10000; - - private final Processor processor; - private final transient StreamExecutionEnvironment env; - private final SamoaDelegateFunction fun; - private transient DataStream<SamoaType> inStream; - private transient DataStream<SamoaType> outStream; - private transient List<FlinkStream> outputStreams = Lists.newArrayList(); - private transient List<Tuple3<FlinkStream, PartitioningScheme, Integer>> inputStreams = Lists.newArrayList(); - private int parallelism; - private static int numberOfPIs = 0; - private int piID; - private List<Integer> circleId; //check if we can refactor this - private boolean onIteration; - //private int circleId; //check if we can refactor this - - public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc) { - this(env, proc, 1); - } - - public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc, int parallelism) { - this(env, new SamoaDelegateFunction(proc), proc, parallelism); - } - - public FlinkProcessingItem(StreamExecutionEnvironment env, SamoaDelegateFunction fun, Processor proc, int parallelism) { - super(fun); - this.env = env; - this.fun = fun; - this.processor = proc; - this.parallelism = parallelism; - this.piID = numberOfPIs++; - this.circleId = new ArrayList<Integer>() { - }; // if size equals 0, then it is part of no circle - } - - public Stream createStream() { - FlinkStream generatedStream = new FlinkStream(this); - outputStreams.add(generatedStream); - return generatedStream; - } - - public void putToStream(ContentEvent data, Stream targetStream) { - collector.collect(SamoaType.of(data, targetStream.getStreamId())); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.processor.onCreate(getComponentId()); - } - - @Override - public void initialise() { - for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) { - if (inputStream.f0.isInitialised()) { //if input stream is initialised - try { - DataStream toBeMerged = Utils.subscribe(inputStream.f0.getOutStream(), inputStream.f1); - if (inStream == null) { - inStream = toBeMerged; - } else { - inStream = inStream.merge(toBeMerged); - } - } catch (RuntimeException e) { - e.printStackTrace(); - System.exit(1); - } - } - } - - if (onIteration) { - inStream = inStream.iterate(MAX_WAIT_TIME_MILLIS); - } - outStream = inStream.transform("samoaProcessor", Utils.tempTypeInfo, this).setParallelism(parallelism); - } - - public void initialiseStreams() { - for (FlinkStream stream : this.getOutputStreams()) { - stream.initialise(); - } - } - - @Override - public boolean canBeInitialised() { - for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) { - if (!inputStream.f0.isInitialised()) return false; - } - return true; - } - - @Override - public boolean isInitialised() { - return outStream != null; - } - - @Override - public Processor getProcessor() { - return processor; - } - - @Override - public void invoke() throws Exception { - while (readNext() != null) { - SamoaType t = nextObject; - fun.processEvent(t.f1); - } - } - - @Override - public ProcessingItem connectInputShuffleStream(Stream inputStream) { - inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.SHUFFLE, ((FlinkStream) inputStream).getSourcePiId())); - return this; - } - - @Override - public ProcessingItem connectInputKeyStream(Stream inputStream) { - inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.GROUP_BY_KEY, ((FlinkStream) inputStream).getSourcePiId())); - return this; - } - - @Override - public ProcessingItem connectInputAllStream(Stream inputStream) { - inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.BROADCAST, ((FlinkStream) inputStream).getSourcePiId())); - return this; - } - - @Override - public int getParallelism() { - return parallelism; - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public List<FlinkStream> getOutputStreams() { - return outputStreams; - } - - public DataStream<SamoaType> getOutStream() { - return this.outStream; - } - - public void setOutStream(DataStream outStream) { - this.outStream = outStream; - } - - @Override - public int getComponentId() { - return piID; - } - - public boolean isPartOfCircle() { - return this.circleId.size() > 0; - } - - public List<Integer> getCircleIds() { - return circleId; - } - - public void addPItoLoop(int piId) { - this.circleId.add(piId); - } - - public DataStream<SamoaType> getInStream() { - return inStream; - } - - public List<Tuple3<FlinkStream, PartitioningScheme, Integer>> getInputStreams() { - return inputStreams; - } - - public void setOnIteration(boolean onIteration) { - this.onIteration = onIteration; - } - - public boolean isOnIteration() { - return onIteration; - } - - static class SamoaDelegateFunction implements Function, Serializable { - private final Processor proc; - - SamoaDelegateFunction(Processor proc) { - this.proc = proc; - } - - public void processEvent(ContentEvent event) { - proc.process(event); - } - } - - public FlinkStream getInputStreamBySourceID(int sourceID) { - for (Tuple3<FlinkStream, PartitioningScheme, Integer> fstreams : inputStreams) { - if (fstreams.f2 == sourceID) { - return fstreams.f0; - } - } - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java deleted file mode 100644 index c5cb0ed..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.yahoo.labs.flink.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - - -import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.topology.AbstractStream; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.io.Serializable; - - -/** - * A stream for SAMOA based on Apache Flink's DataStream - */ -public class FlinkStream extends AbstractStream implements FlinkComponent, Serializable { - - private static int outputCounter = 0; - private FlinkComponent procItem; - private transient DataStream<SamoaType> dataStream; - private int sourcePiId; - private String flinkStreamId; - - public FlinkStream(FlinkComponent sourcePi) { - this.procItem = sourcePi; - this.sourcePiId = sourcePi.getComponentId(); - setStreamId("stream-" + Integer.toString(outputCounter)); - flinkStreamId = "stream-" + Integer.toString(outputCounter); - outputCounter++; - } - - @Override - public void initialise() { - if (procItem instanceof FlinkProcessingItem) { - dataStream = procItem.getOutStream().filter(Utils.getFilter(getStreamId())) - .setParallelism(((FlinkProcessingItem) procItem).getParallelism()); - } else - dataStream = procItem.getOutStream(); - } - - @Override - public boolean canBeInitialised() { - return procItem.isInitialised(); - } - - @Override - public boolean isInitialised() { - return dataStream != null; - } - - @Override - public DataStream getOutStream() { - return dataStream; - } - - @Override - public void put(ContentEvent event) { - ((FlinkProcessingItem) procItem).putToStream(event, this); - } - - @Override - public int getComponentId() { - return -1; //dummy number shows that it comes from a Stream - } - - public int getSourcePiId() { - return sourcePiId; - } - - @Override - public String getStreamId() { - return flinkStreamId; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java deleted file mode 100644 index f04d792..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.yahoo.labs.flink.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection; -import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils; -import com.yahoo.labs.samoa.topology.AbstractTopology; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.datastream.IterativeDataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; - -/** - * A SAMOA topology on Apache Flink - * - * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated within custom operators. - * Streams are tagged and filtered in each operator's output so they can be routed to the right - * operator respectively. Building a Flink topology from a Samoa task involves invoking all these - * stream transformations and finally, marking and initiating loops in the graph. We have to do that - * since Flink only allows explicit loops in the topology started with 'iterate()' and closed with - * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the - * sources, mark loops and initialize them with explicit iterations. - * - */ -public class FlinkTopology extends AbstractTopology { - - private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class); - public static StreamExecutionEnvironment env; - public List<List<FlinkProcessingItem>> topologyLoops = new ArrayList<>(); - public List<Integer> backEdges = new ArrayList<Integer>(); - - public FlinkTopology(String name, StreamExecutionEnvironment env) { - super(name); - this.env = env; - } - - public StreamExecutionEnvironment getEnvironment() { - return env; - } - - public void build() { - markCircles(); - for (EntranceProcessingItem src : getEntranceProcessingItems()) { - ((FlinkEntranceProcessingItem) src).initialise(); - } - initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class))); - } - - private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) { - if (flinkComponents.isEmpty()) return; - - for (FlinkProcessingItem comp : flinkComponents) { - if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCircle()) { - comp.initialise(); - comp.initialiseStreams(); - - }//if component is part of one or more circle - else if (comp.isPartOfCircle() && !comp.isInitialised()) { - for (Integer circle : comp.getCircleIds()) { - //check if circle can be initialized - if (checkCircleReady(circle)) { - logger.debug("Circle: " + circle + " can be initialised"); - initialiseCircle(circle); - } else { - logger.debug("Circle cannot be initialised"); - } - } - } - - } - initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() { - @Override - public boolean apply(FlinkProcessingItem flinkComponent) { - return !flinkComponent.isInitialised(); - } - }))); - } - - private void markCircles(){ - List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)); - List<Integer>[] graph = new List[pis.size()]; - FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()]; - - - for (int i=0;i<pis.size();i++) { - graph[i] = new ArrayList<Integer>(); - } - //construct the graph of the topology for the Processing Items (No entrance pi is included) - for (FlinkProcessingItem pi: pis) { - processingItems[pi.getComponentId()] = pi; - for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) { - if (is.f2 != -1) graph[is.f2].add(pi.getComponentId()); - } - } - for (int g=0;g<graph.length;g++) - logger.debug(graph[g].toString()); - - CircleDetection detCircles = new CircleDetection(); - List<List<Integer>> circles = detCircles.getCircles(graph); - - //update PIs, regarding being part of a circle. - for (List<Integer> c : circles){ - List<FlinkProcessingItem> circle = new ArrayList<>(); - for (Integer it : c){ - circle.add(processingItems[it]); - processingItems[it].addPItoLoop(topologyLoops.size()); - } - topologyLoops.add(circle); - backEdges.add(circle.get(0).getComponentId()); - } - logger.debug("Circles detected in the topology: " + circles); - } - - - private boolean checkCircleReady(int circleId) { - - List<Integer> circleIds = new ArrayList<>(); - - for (FlinkProcessingItem pi : topologyLoops.get(circleId)) { - circleIds.add(pi.getComponentId()); - } - //check that all incoming to the circle streams are initialised - for (FlinkProcessingItem procItem : topologyLoops.get(circleId)) { - for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) { - //if a inputStream is not initialized AND source of inputStream is not in the circle or a tail of other circle - if ((!inputStream.f0.isInitialised()) && (!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2))) - return false; - } - } - return true; - } - - private void initialiseCircle(int circleId) { - //get the head and tail of circle - FlinkProcessingItem tail = topologyLoops.get(circleId).get(0); - FlinkProcessingItem head = topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1); - - //initialise source stream of the iteration, so as to use it for the iteration starting point - if (!head.isInitialised()) { - head.setOnIteration(true); - head.initialise(); - head.initialiseStreams(); - } - - //initialise all nodes after head - for (int node = topologyLoops.get(circleId).size() - 2; node >= 0; node--) { - topologyLoops.get(circleId).get(node).initialise(); - topologyLoops.get(circleId).get(node).initialiseStreams(); - } - - ((IterativeDataStream) head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream()); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java deleted file mode 100644 index 16d050a..0000000 --- a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.yahoo.labs.flink.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - - - - -import com.yahoo.labs.samoa.core.ContentEvent; -import org.apache.flink.api.java.tuple.Tuple3; - -public class SamoaType extends Tuple3<String, ContentEvent, String> { - public SamoaType() { - super(); - } - - private SamoaType(String key, ContentEvent event, String streamId) { - super(key, event, streamId); - } - - public static SamoaType of(ContentEvent event, String streamId) { - String key = event.getKey() == null ? "none" : event.getKey(); - return new SamoaType(key, event, streamId); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java new file mode 100644 index 0000000..cd0b82c --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java @@ -0,0 +1,88 @@ +package org.apache.samoa.flink; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.ClassOption; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.samoa.flink.helpers.CircleDetection; +import org.apache.samoa.flink.helpers.Utils; +import org.apache.samoa.flink.topology.impl.FlinkComponentFactory; +import org.apache.samoa.flink.topology.impl.FlinkProcessingItem; +import org.apache.samoa.flink.topology.impl.FlinkStream; +import org.apache.samoa.flink.topology.impl.FlinkTopology; +import org.apache.samoa.tasks.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + + +/** + * Main class to run a SAMOA on Apache Flink + */ +public class FlinkDoTask { + + private static final Logger logger = LoggerFactory.getLogger(FlinkDoTask.class); + + + public static void main(String[] args) throws Exception { + List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + + args = tmpArgs.toArray(new String[0]); + + // Init Task + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); + } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task; + try { + task = ClassOption.cliStringToObject(cliString.toString(), Task.class, null); + logger.debug("Successfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Failed to initialize the task: ", e); + System.out.println("Failed to initialize the task: " + e); + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + task.setFactory(new FlinkComponentFactory(env)); + task.init(); + + logger.debug("Building Flink topology..."); + ((FlinkTopology) task.getTopology()).build(); + + logger.debug("Submitting the job..."); + env.execute(); + + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java new file mode 100644 index 0000000..a5a3b9d --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java @@ -0,0 +1,99 @@ +package org.apache.samoa.flink.helpers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +/** + * This class contains all logic needed in order to mark circles in job graphs explicitly such as + * in the case of Apache Flink. A circle is defined as a list of node ids ordered in topological + * (DFS) order. + * + */ +public class CircleDetection { + private int[] index; + private int[] lowLink; + private int counter; + private Stack<Integer> stack; + private List<List<Integer>> scc; + List<Integer>[] graph; + + + public CircleDetection() { + stack = new Stack<Integer>(); + scc = new ArrayList<>(); + } + + public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) { + graph = adjacencyList; + index = new int[adjacencyList.length]; + lowLink = new int[adjacencyList.length]; + counter = 0; + + //initialize index and lowLink as "undefined"(=-1) + for (int j = 0; j < graph.length; j++) { + index[j] = -1; + lowLink[j] = -1; + } + for (int v = 0; v < graph.length; v++) { + if (index[v] == -1) { //undefined. + findSCC(v); + } + } + return scc; + } + + private void findSCC(int node) { + index[node] = counter; + lowLink[node] = counter; + counter++; + stack.push(node); + + for (int neighbor : graph[node]) { + if (index[neighbor] == -1) { + findSCC(neighbor); + lowLink[node] = Math.min(lowLink[node], lowLink[neighbor]); + } else if (stack.contains(neighbor)) { //if neighbor has been already visited + lowLink[node] = Math.min(lowLink[node], index[neighbor]); + List<Integer> sccComponent = new ArrayList<Integer>(); + int w; + do { + w = stack.pop(); + sccComponent.add(w); + } while (neighbor != w); + //add neighbor again, just in case it is a member of another circle + stack.add(neighbor); + scc.add(sccComponent); + } + + } + if (lowLink[node] == index[node]) { + int w; + do { + w = stack.pop(); + } while (node != w); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java new file mode 100644 index 0000000..38b4bdc --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java @@ -0,0 +1,69 @@ +package org.apache.samoa.flink.helpers; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.flink.topology.impl.SamoaType; +import org.apache.samoa.utils.PartitioningScheme; + +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; + +public class Utils { + + public static TypeInformation<SamoaType> tempTypeInfo = new TupleTypeInfo(SamoaType.class, STRING_TYPE_INFO, TypeExtractor.getForClass(ContentEvent.class), STRING_TYPE_INFO); + + public static DataStream subscribe(DataStream<SamoaType> stream, PartitioningScheme partitioning) { + switch (partitioning) { + case BROADCAST: + return stream.broadcast(); + case GROUP_BY_KEY: + return stream.groupBy(new KeySelector<SamoaType, String>() { + @Override + public String getKey(SamoaType samoaType) throws Exception { + return samoaType.f0; + } + }); + case SHUFFLE: + default: + return stream.shuffle(); + } + } + + public static FilterFunction<SamoaType> getFilter(final String streamID) { + return new FilterFunction<SamoaType>() { + @Override + public boolean filter(SamoaType o) throws Exception { + return o.f2.equals(streamID); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponent.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponent.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponent.java new file mode 100644 index 0000000..b61f590 --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponent.java @@ -0,0 +1,68 @@ +package org.apache.samoa.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + +import org.apache.flink.streaming.api.datastream.DataStream; + +/** + * Common interface of FlinkEntranceProcessingItem and FlinkProcessingItem + */ +public interface FlinkComponent { + + /** + * An initiation of the node. It should create the right invokables and apply the appropriate + * stream transformations + */ + public void initialise(); + + /** + * This check is needed in order to determine whether all requirements for a Flink Component + * (DataStream) are satisfied in order to initialise it. This is necessary in this integration + * since Flink Streaming applies eager datastream generation based on transformations. + * + * @return + */ + public boolean canBeInitialised(); + + /** + * + * @return + */ + public boolean isInitialised(); + + /** + * The wrapped Flink DataStream generated by this Flink component. Mind that the component + * should first be initialised in order to have a generated DataStream + * + * @return + */ + public DataStream<SamoaType> getOutStream(); + + /** + * A unique component id + * + * @return + */ + public int getComponentId(); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponentFactory.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponentFactory.java new file mode 100644 index 0000000..93e4626 --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponentFactory.java @@ -0,0 +1,66 @@ +package org.apache.samoa.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.*; + +/** + * An implementation of SAMOA's ComponentFactory for Apache Flink + */ +public class FlinkComponentFactory implements ComponentFactory { + + private StreamExecutionEnvironment env; + + public FlinkComponentFactory(StreamExecutionEnvironment env) { + this.env = env; + } + + @Override + public ProcessingItem createPi(Processor processor) { + return new FlinkProcessingItem(env, processor); + } + + @Override + public ProcessingItem createPi(Processor processor, int parallelism) { + return new FlinkProcessingItem(env, processor, parallelism); + } + + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + return new FlinkEntranceProcessingItem(env, entranceProcessor); + } + + @Override + public Stream createStream(IProcessingItem sourcePi) { + if (sourcePi instanceof FlinkProcessingItem) + return ((FlinkProcessingItem) sourcePi).createStream(); + else return new FlinkStream((FlinkComponent) sourcePi); + } + + @Override + public Topology createTopology(String topologyName) { + return new FlinkTopology(topologyName, env); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java new file mode 100644 index 0000000..e00874b --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java @@ -0,0 +1,101 @@ +package org.apache.samoa.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.source.RichSourceFunction; +import org.apache.flink.util.Collector; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.flink.helpers.Utils; +import org.apache.samoa.topology.AbstractEntranceProcessingItem; + +import java.io.Serializable; + +public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem + implements FlinkComponent, Serializable { + + private transient StreamExecutionEnvironment env; + private transient DataStream outStream; + + + public FlinkEntranceProcessingItem(StreamExecutionEnvironment env, EntranceProcessor proc) { + super(proc); + this.env = env; + } + + @Override + public void initialise() { + final EntranceProcessor proc = getProcessor(); + final String streamId = getOutputStream().getStreamId(); + final int compID = getComponentId(); + + + outStream = env.addSource(new RichSourceFunction<SamoaType>() { + volatile boolean canceled; + EntranceProcessor entrProc = proc; + String id = streamId; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + entrProc.onCreate(compID); + } + + @Override + public void run(Collector<SamoaType> collector) throws Exception { + while (!canceled && entrProc.hasNext()) { + collector.collect(SamoaType.of(entrProc.nextEvent(), id)); + } + } + + @Override + public void cancel() { + canceled = true; + } + },Utils.tempTypeInfo); + + ((FlinkStream) getOutputStream()).initialise(); + } + + + @Override + public boolean canBeInitialised() { + return true; + } + + @Override + public boolean isInitialised() { + return outStream != null; + } + + @Override + public int getComponentId() { + return -1; // dummy number shows that it comes from an Entrance PI + } + + @Override + public DataStream getOutStream() { + return outStream; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java new file mode 100644 index 0000000..3f5431c --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java @@ -0,0 +1,249 @@ +package org.apache.samoa.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + +import com.google.common.collect.Lists; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.flink.helpers.Utils; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.utils.PartitioningScheme; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + + +public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> implements ProcessingItem, FlinkComponent, Serializable { + + private static final Logger logger = LoggerFactory.getLogger(FlinkProcessingItem.class); + public static final int MAX_WAIT_TIME_MILLIS = 10000; + + private final Processor processor; + private final transient StreamExecutionEnvironment env; + private final SamoaDelegateFunction fun; + private transient DataStream<SamoaType> inStream; + private transient DataStream<SamoaType> outStream; + private transient List<FlinkStream> outputStreams = Lists.newArrayList(); + private transient List<Tuple3<FlinkStream, PartitioningScheme, Integer>> inputStreams = Lists.newArrayList(); + private int parallelism; + private static int numberOfPIs = 0; + private int piID; + private List<Integer> circleId; //check if we can refactor this + private boolean onIteration; + //private int circleId; //check if we can refactor this + + public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc) { + this(env, proc, 1); + } + + public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc, int parallelism) { + this(env, new SamoaDelegateFunction(proc), proc, parallelism); + } + + public FlinkProcessingItem(StreamExecutionEnvironment env, SamoaDelegateFunction fun, Processor proc, int parallelism) { + super(fun); + this.env = env; + this.fun = fun; + this.processor = proc; + this.parallelism = parallelism; + this.piID = numberOfPIs++; + this.circleId = new ArrayList<Integer>() { + }; // if size equals 0, then it is part of no circle + } + + public Stream createStream() { + FlinkStream generatedStream = new FlinkStream(this); + outputStreams.add(generatedStream); + return generatedStream; + } + + public void putToStream(ContentEvent data, Stream targetStream) { + collector.collect(SamoaType.of(data, targetStream.getStreamId())); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.processor.onCreate(getComponentId()); + } + + @Override + public void initialise() { + for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) { + if (inputStream.f0.isInitialised()) { //if input stream is initialised + try { + DataStream toBeMerged = Utils.subscribe(inputStream.f0.getOutStream(), inputStream.f1); + if (inStream == null) { + inStream = toBeMerged; + } else { + inStream = inStream.merge(toBeMerged); + } + } catch (RuntimeException e) { + e.printStackTrace(); + System.exit(1); + } + } + } + + if (onIteration) { + inStream = inStream.iterate(MAX_WAIT_TIME_MILLIS); + } + outStream = inStream.transform("samoaProcessor", Utils.tempTypeInfo, this).setParallelism(parallelism); + } + + public void initialiseStreams() { + for (FlinkStream stream : this.getOutputStreams()) { + stream.initialise(); + } + } + + @Override + public boolean canBeInitialised() { + for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) { + if (!inputStream.f0.isInitialised()) return false; + } + return true; + } + + @Override + public boolean isInitialised() { + return outStream != null; + } + + @Override + public Processor getProcessor() { + return processor; + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + SamoaType t = nextObject; + fun.processEvent(t.f1); + } + } + + @Override + public ProcessingItem connectInputShuffleStream(Stream inputStream) { + inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.SHUFFLE, ((FlinkStream) inputStream).getSourcePiId())); + return this; + } + + @Override + public ProcessingItem connectInputKeyStream(Stream inputStream) { + inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.GROUP_BY_KEY, ((FlinkStream) inputStream).getSourcePiId())); + return this; + } + + @Override + public ProcessingItem connectInputAllStream(Stream inputStream) { + inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.BROADCAST, ((FlinkStream) inputStream).getSourcePiId())); + return this; + } + + @Override + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public List<FlinkStream> getOutputStreams() { + return outputStreams; + } + + public DataStream<SamoaType> getOutStream() { + return this.outStream; + } + + public void setOutStream(DataStream outStream) { + this.outStream = outStream; + } + + @Override + public int getComponentId() { + return piID; + } + + public boolean isPartOfCircle() { + return this.circleId.size() > 0; + } + + public List<Integer> getCircleIds() { + return circleId; + } + + public void addPItoLoop(int piId) { + this.circleId.add(piId); + } + + public DataStream<SamoaType> getInStream() { + return inStream; + } + + public List<Tuple3<FlinkStream, PartitioningScheme, Integer>> getInputStreams() { + return inputStreams; + } + + public void setOnIteration(boolean onIteration) { + this.onIteration = onIteration; + } + + public boolean isOnIteration() { + return onIteration; + } + + static class SamoaDelegateFunction implements Function, Serializable { + private final Processor proc; + + SamoaDelegateFunction(Processor proc) { + this.proc = proc; + } + + public void processEvent(ContentEvent event) { + proc.process(event); + } + } + + public FlinkStream getInputStreamBySourceID(int sourceID) { + for (Tuple3<FlinkStream, PartitioningScheme, Integer> fstreams : inputStreams) { + if (fstreams.f2 == sourceID) { + return fstreams.f0; + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java new file mode 100644 index 0000000..31617a7 --- /dev/null +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java @@ -0,0 +1,94 @@ +package org.apache.samoa.flink.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.flink.helpers.Utils; +import org.apache.samoa.topology.AbstractStream; + +import java.io.Serializable; + + +/** + * A stream for SAMOA based on Apache Flink's DataStream + */ +public class FlinkStream extends AbstractStream implements FlinkComponent, Serializable { + + private static int outputCounter = 0; + private FlinkComponent procItem; + private transient DataStream<SamoaType> dataStream; + private int sourcePiId; + private String flinkStreamId; + + public FlinkStream(FlinkComponent sourcePi) { + this.procItem = sourcePi; + this.sourcePiId = sourcePi.getComponentId(); + setStreamId("stream-" + Integer.toString(outputCounter)); + flinkStreamId = "stream-" + Integer.toString(outputCounter); + outputCounter++; + } + + @Override + public void initialise() { + if (procItem instanceof FlinkProcessingItem) { + dataStream = procItem.getOutStream().filter(Utils.getFilter(getStreamId())) + .setParallelism(((FlinkProcessingItem) procItem).getParallelism()); + } else + dataStream = procItem.getOutStream(); + } + + @Override + public boolean canBeInitialised() { + return procItem.isInitialised(); + } + + @Override + public boolean isInitialised() { + return dataStream != null; + } + + @Override + public DataStream getOutStream() { + return dataStream; + } + + @Override + public void put(ContentEvent event) { + ((FlinkProcessingItem) procItem).putToStream(event, this); + } + + @Override + public int getComponentId() { + return -1; //dummy number shows that it comes from a Stream + } + + public int getSourcePiId() { + return sourcePiId; + } + + @Override + public String getStreamId() { + return flinkStreamId; + } +}
