http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/org/apache/samoa/streams/fs/HDFSFileStreamSourceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/fs/HDFSFileStreamSourceTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/fs/HDFSFileStreamSourceTest.java
new file mode 100644
index 0000000..eaba37d
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/fs/HDFSFileStreamSourceTest.java
@@ -0,0 +1,307 @@
+package org.apache.samoa.streams.fs;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.BufferedWriter;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.Builder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.samoa.streams.fs.HDFSFileStreamSource;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HDFSFileStreamSourceTest {
+
+  private static final String[] HOSTS = { "localhost" };
+  private static final String BASE_DIR = "/minidfsTest";
+  private static final int NUM_FILES_IN_DIR = 4;
+  private static final int NUM_NOISE_FILES_IN_DIR = 2;
+
+  private HDFSFileStreamSource streamSource;
+
+  private Configuration config;
+  private MiniDFSCluster hdfsCluster;
+  private String hdfsURI;
+
+  @Before
+  public void setUp() throws Exception {
+    // Start MiniDFSCluster
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new 
Configuration()).hosts(HOSTS).numDataNodes(1)
+        .format(true);
+    hdfsCluster = builder.build();
+    hdfsCluster.waitActive();
+    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort();
+
+    // Construct stream source
+    streamSource = new HDFSFileStreamSource();
+
+    // General config
+    config = new Configuration();
+    config.set("fs.defaultFS", hdfsURI);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hdfsCluster.shutdown();
+  }
+
+  /*
+   * Init tests
+   */
+  @Test
+  public void testInitWithSingleFileAndExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, "txt", 1);
+
+    // init with path to input file
+    streamSource.init(config, BASE_DIR + "/1.txt", "txt");
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
+    String fn = streamSource.getFilePathAt(0);
+    assertTrue("Incorrect file in filePaths.",
+        fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + 
"1.txt"));
+  }
+
+  @Test
+  public void testInitWithSingleFileAndNullExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, "txt", 1);
+
+    // init with path to input file
+    streamSource.init(config, BASE_DIR + "/1.txt", null);
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
+    String fn = streamSource.getFilePathAt(0);
+    assertTrue("Incorrect file in filePaths.",
+        fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + 
"1.txt"));
+  }
+
+  @Test
+  public void testInitWithFolderAndExtension() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+    writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, "txt");
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
+    Set<String> filenames = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      String targetFn = BASE_DIR + "/" + Integer.toString(i) + ".txt";
+      filenames.add(targetFn);
+      filenames.add(hdfsURI + targetFn);
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      String fn = streamSource.getFilePathAt(i);
+      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
+    }
+  }
+
+  @Test
+  public void testInitWithFolderAndNullExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, null);
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
+    Set<String> filenames = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      String targetFn = BASE_DIR + "/" + Integer.toString(i);
+      filenames.add(targetFn);
+      filenames.add(hdfsURI + targetFn);
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      String fn = streamSource.getFilePathAt(i);
+      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
+    }
+  }
+
+  /*
+   * getNextInputStream tests
+   */
+  @Test
+  public void testGetNextInputStream() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, "txt");
+
+    // call getNextInputStream & assertions
+    Set<String> contents = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      contents.add(Integer.toString(i));
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      InputStream inStream = streamSource.getNextInputStream();
+      assertNotNull("Unexpected end of input stream list.", inStream);
+
+      BufferedReader rd = new BufferedReader(new InputStreamReader(inStream));
+      String inputRead = null;
+      try {
+        inputRead = rd.readLine();
+      } catch (IOException ioe) {
+        fail("Fail reading from stream at index:" + i + ioe.getMessage());
+      }
+      assertTrue("File content is incorrect.", contents.contains(inputRead));
+      Iterator<String> it = contents.iterator();
+      while (it.hasNext()) {
+        if (it.next().equals(inputRead)) {
+          it.remove();
+          break;
+        }
+      }
+    }
+
+    // assert that another call to getNextInputStream will return null
+    assertNull("Call getNextInputStream after the last file did not return 
null.", streamSource.getNextInputStream());
+  }
+
+  /*
+   * getCurrentInputStream tests
+   */
+  public void testGetCurrentInputStream() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, "txt");
+
+    // call getNextInputStream, getCurrentInputStream & assertions
+    for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also 
after-end-of-list
+      InputStream inStream1 = streamSource.getNextInputStream();
+      InputStream inStream2 = streamSource.getCurrentInputStream();
+      assertSame("Incorrect current input stream.", inStream1, inStream2);
+    }
+  }
+
+  /*
+   * reset tests
+   */
+  public void testReset() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(config, BASE_DIR, "txt");
+
+    // Get the first input string
+    InputStream firstInStream = streamSource.getNextInputStream();
+    String firstInput = null;
+    assertNotNull("Unexpected end of input stream list.", firstInStream);
+
+    BufferedReader rd1 = new BufferedReader(new 
InputStreamReader(firstInStream));
+    try {
+      firstInput = rd1.readLine();
+    } catch (IOException ioe) {
+      fail("Fail reading from stream at index:0" + ioe.getMessage());
+    }
+
+    // call getNextInputStream a few times
+    streamSource.getNextInputStream();
+
+    // call reset, call next, assert that output is 1 (the first file)
+    try {
+      streamSource.reset();
+    } catch (IOException ioe) {
+      fail("Fail resetting stream source." + ioe.getMessage());
+    }
+
+    InputStream inStream = streamSource.getNextInputStream();
+    assertNotNull("Unexpected end of input stream list.", inStream);
+
+    BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream));
+    String inputRead = null;
+    try {
+      inputRead = rd2.readLine();
+    } catch (IOException ioe) {
+      fail("Fail reading from stream at index:0" + ioe.getMessage());
+    }
+    assertEquals("File content is incorrect.", firstInput, inputRead);
+  }
+
+  private void writeSimpleFiles(String path, String ext, int numOfFiles) {
+    // get filesystem
+    FileSystem dfs;
+    try {
+      dfs = hdfsCluster.getFileSystem();
+    } catch (IOException ioe) {
+      fail("Could not access MiniDFSCluster" + ioe.getMessage());
+      return;
+    }
+
+    // create basedir
+    Path basedir = new Path(path);
+    try {
+      dfs.mkdirs(basedir);
+    } catch (IOException ioe) {
+      fail("Could not create DIR:" + path + "\n" + ioe.getMessage());
+      return;
+    }
+
+    // write files
+    for (int i = 1; i <= numOfFiles; i++) {
+      String fn = null;
+      if (ext != null) {
+        fn = Integer.toString(i) + "." + ext;
+      } else {
+        fn = Integer.toString(i);
+      }
+
+      try {
+        OutputStream fin = dfs.create(new Path(path, fn));
+        BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(fin));
+        wr.write(Integer.toString(i));
+        wr.close();
+        fin.close();
+      } catch (IOException ioe) {
+        fail("Fail writing to input file: " + fn + " in directory: " + path + 
ioe.getMessage());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/org/apache/samoa/streams/fs/LocalFileStreamSourceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/fs/LocalFileStreamSourceTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/fs/LocalFileStreamSourceTest.java
new file mode 100644
index 0000000..374ebd1
--- /dev/null
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/fs/LocalFileStreamSourceTest.java
@@ -0,0 +1,277 @@
+package org.apache.samoa.streams.fs;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.BufferedWriter;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.lang.SecurityException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.commons.io.FileUtils;
+import org.apache.samoa.streams.fs.LocalFileStreamSource;
+
+public class LocalFileStreamSourceTest {
+  private static final String BASE_DIR = "localfsTest";
+  private static final int NUM_FILES_IN_DIR = 4;
+  private static final int NUM_NOISE_FILES_IN_DIR = 2;
+
+  private LocalFileStreamSource streamSource;
+
+  @Before
+  public void setUp() throws Exception {
+    streamSource = new LocalFileStreamSource();
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(new File(BASE_DIR));
+  }
+
+  @Test
+  public void testInitWithSingleFileAndExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, "txt", 1);
+
+    // init with path to input file
+    File inFile = new File(BASE_DIR, "1.txt");
+    String inFilePath = inFile.getAbsolutePath();
+    streamSource.init(inFilePath, "txt");
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
+    String fn = streamSource.getFilePathAt(0);
+    assertEquals("Incorrect file in filePaths.", inFilePath, fn);
+  }
+
+  @Test
+  public void testInitWithSingleFileAndNullExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, "txt", 1);
+
+    // init with path to input file
+    File inFile = new File(BASE_DIR, "1.txt");
+    String inFilePath = inFile.getAbsolutePath();
+    streamSource.init(inFilePath, null);
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", 1, 
streamSource.getFilePathListSize(), 0);
+    String fn = streamSource.getFilePathAt(0);
+    assertEquals("Incorrect file in filePaths.", inFilePath, fn);
+  }
+
+  @Test
+  public void testInitWithFolderAndExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR);
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    File inDir = new File(BASE_DIR);
+    String inDirPath = inDir.getAbsolutePath();
+    streamSource.init(inDirPath, "txt");
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
+    Set<String> filenames = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      String expectedFn = (new File(inDirPath, Integer.toString(i) + 
".txt")).getAbsolutePath();
+      filenames.add(expectedFn);
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      String fn = streamSource.getFilePathAt(i);
+      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
+    }
+  }
+
+  @Test
+  public void testInitWithFolderAndNullExtension() {
+    // write input file
+    writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    File inDir = new File(BASE_DIR);
+    String inDirPath = inDir.getAbsolutePath();
+    streamSource.init(inDirPath, null);
+
+    // assertions
+    assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, 
streamSource.getFilePathListSize(), 0);
+    Set<String> filenames = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      String expectedFn = (new File(inDirPath, 
Integer.toString(i))).getAbsolutePath();
+      filenames.add(expectedFn);
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      String fn = streamSource.getFilePathAt(i);
+      assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn));
+    }
+  }
+
+  /*
+   * getNextInputStream tests
+   */
+  @Test
+  public void testGetNextInputStream() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(BASE_DIR, "txt");
+
+    // call getNextInputStream & assertions
+    Set<String> contents = new HashSet<String>();
+    for (int i = 1; i <= NUM_FILES_IN_DIR; i++) {
+      contents.add(Integer.toString(i));
+    }
+    for (int i = 0; i < NUM_FILES_IN_DIR; i++) {
+      InputStream inStream = streamSource.getNextInputStream();
+      assertNotNull("Unexpected end of input stream list.", inStream);
+
+      BufferedReader rd = new BufferedReader(new InputStreamReader(inStream));
+      String inputRead = null;
+      try {
+        inputRead = rd.readLine();
+      } catch (IOException ioe) {
+        fail("Fail reading from stream at index:" + i + ioe.getMessage());
+      }
+      assertTrue("File content is incorrect.", contents.contains(inputRead));
+      Iterator<String> it = contents.iterator();
+      while (it.hasNext()) {
+        if (it.next().equals(inputRead)) {
+          it.remove();
+          break;
+        }
+      }
+    }
+
+    // assert that another call to getNextInputStream will return null
+    assertNull("Call getNextInputStream after the last file did not return 
null.", streamSource.getNextInputStream());
+  }
+
+  /*
+   * getCurrentInputStream tests
+   */
+  public void testGetCurrentInputStream() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(BASE_DIR, "txt");
+
+    // call getNextInputStream, getCurrentInputStream & assertions
+    for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also 
after-end-of-list
+      InputStream inStream1 = streamSource.getNextInputStream();
+      InputStream inStream2 = streamSource.getCurrentInputStream();
+      assertSame("Incorrect current input stream.", inStream1, inStream2);
+    }
+  }
+
+  /*
+   * reset tests
+   */
+  public void testReset() {
+    // write input files & noise files
+    writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR);
+
+    // init with path to input dir
+    streamSource.init(BASE_DIR, "txt");
+
+    // Get the first input string
+    InputStream firstInStream = streamSource.getNextInputStream();
+    String firstInput = null;
+    assertNotNull("Unexpected end of input stream list.", firstInStream);
+
+    BufferedReader rd1 = new BufferedReader(new 
InputStreamReader(firstInStream));
+    try {
+      firstInput = rd1.readLine();
+    } catch (IOException ioe) {
+      fail("Fail reading from stream at index:0" + ioe.getMessage());
+    }
+
+    // call getNextInputStream a few times
+    streamSource.getNextInputStream();
+
+    // call reset, call next, assert that output is 1 (the first file)
+    try {
+      streamSource.reset();
+    } catch (IOException ioe) {
+      fail("Fail resetting stream source." + ioe.getMessage());
+    }
+
+    InputStream inStream = streamSource.getNextInputStream();
+    assertNotNull("Unexpected end of input stream list.", inStream);
+
+    BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream));
+    String inputRead = null;
+    try {
+      inputRead = rd2.readLine();
+    } catch (IOException ioe) {
+      fail("Fail reading from stream at index:0" + ioe.getMessage());
+    }
+    assertEquals("File content is incorrect.", firstInput, inputRead);
+  }
+
+  private void writeSimpleFiles(String path, String ext, int numOfFiles) {
+    // Create folder
+    File folder = new File(path);
+    if (!folder.exists()) {
+      try {
+        folder.mkdir();
+      } catch (SecurityException se) {
+        fail("Failed creating directory:" + path + se);
+      }
+    }
+
+    // Write files
+    for (int i = 1; i <= numOfFiles; i++) {
+      String fn = null;
+      if (ext != null) {
+        fn = Integer.toString(i) + "." + ext;
+      } else {
+        fn = Integer.toString(i);
+      }
+
+      try {
+        FileWriter fwr = new FileWriter(new File(path, fn));
+        fwr.write(Integer.toString(i));
+        fwr.close();
+      } catch (IOException ioe) {
+        fail("Fail writing to input file: " + fn + " in directory: " + path + 
ioe.getMessage());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
index f00fe3c..f56ac70 100644
--- a/samoa-flink/pom.xml
+++ b/samoa-flink/pom.xml
@@ -42,7 +42,7 @@
 
     <artifactId>samoa-flink</artifactId>
     <parent>
-        <groupId>com.yahoo.labs.samoa</groupId>
+        <groupId>org.apache.samoa</groupId>
         <artifactId>samoa</artifactId>
         <version>0.3.0-SNAPSHOT</version>
     </parent>
@@ -51,7 +51,7 @@
 
     <dependencies>
         <dependency>
-            <groupId>com.yahoo.labs.samoa</groupId>
+            <groupId>org.apache.samoa</groupId>
             <artifactId>samoa-api</artifactId>
             <version>${project.version}</version>
             <exclusions>
@@ -105,7 +105,7 @@
                         </manifestEntries>
                         <manifest>
                             <addClasspath>true</addClasspath>
-                            
<mainClass>com.yahoo.labs.flink.FlinkDoTask</mainClass>
+                            
<mainClass>org.apache.samoa.flink.FlinkDoTask</mainClass>
                         </manifest>
                     </archive>
                 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
deleted file mode 100644
index 6069de9..0000000
--- a/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package com.yahoo.labs.flink;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.github.javacliparser.ClassOption;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
-import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
-import com.yahoo.labs.flink.topology.impl.FlinkComponentFactory;
-import com.yahoo.labs.flink.topology.impl.FlinkProcessingItem;
-import com.yahoo.labs.flink.topology.impl.FlinkStream;
-import com.yahoo.labs.flink.topology.impl.FlinkTopology;
-import com.yahoo.labs.samoa.tasks.Task;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-
-/**
- * Main class to run a SAMOA on Apache Flink
- */
-public class FlinkDoTask {
-
-       private static final Logger logger = 
LoggerFactory.getLogger(FlinkDoTask.class);
-
-
-       public static void main(String[] args) throws Exception {
-               List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
-
-               args = tmpArgs.toArray(new String[0]);
-
-               // Init Task
-               StringBuilder cliString = new StringBuilder();
-               for (int i = 0; i < args.length; i++) {
-                       cliString.append(" ").append(args[i]);
-               }
-               logger.debug("Command line string = {}", cliString.toString());
-               System.out.println("Command line string = " + 
cliString.toString());
-
-               Task task;
-               try {
-                       task = 
ClassOption.cliStringToObject(cliString.toString(), Task.class, null);
-                       logger.debug("Successfully instantiating {}", 
task.getClass().getCanonicalName());
-               } catch (Exception e) {
-                       logger.error("Failed to initialize the task: ", e);
-                       System.out.println("Failed to initialize the task: " + 
e);
-                       return;
-               }
-               
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               task.setFactory(new FlinkComponentFactory(env));
-               task.init();
-               
-               logger.debug("Building Flink topology...");
-               ((FlinkTopology) task.getTopology()).build();
-               
-               logger.debug("Submitting the job...");
-               env.execute();
-
-       }
-
-
-       
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
deleted file mode 100644
index a832ee9..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Stack;
-
-/**
- * This class contains all logic needed in order to mark circles in job graphs 
explicitly such as 
- * in the case of Apache Flink. A circle is defined as a list of node ids 
ordered in topological 
- * (DFS) order.
- * 
- */
-public class CircleDetection {
-       private int[] index;
-       private int[] lowLink;
-       private int counter;
-       private Stack<Integer> stack;
-       private List<List<Integer>> scc;
-       List<Integer>[] graph;
-
-
-       public CircleDetection() {
-               stack = new Stack<Integer>();
-               scc = new ArrayList<>();
-       }
-
-       public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) {
-               graph = adjacencyList;
-               index = new int[adjacencyList.length];
-               lowLink = new int[adjacencyList.length];
-               counter = 0;
-
-               //initialize index and lowLink as "undefined"(=-1)
-               for (int j = 0; j < graph.length; j++) {
-                       index[j] = -1;
-                       lowLink[j] = -1;
-               }
-               for (int v = 0; v < graph.length; v++) {
-                       if (index[v] == -1) { //undefined.
-                               findSCC(v);
-                       }
-               }
-               return scc;
-       }
-
-       private void findSCC(int node) {
-               index[node] = counter;
-               lowLink[node] = counter;
-               counter++;
-               stack.push(node);
-
-               for (int neighbor : graph[node]) {
-                       if (index[neighbor] == -1) {
-                               findSCC(neighbor);
-                               lowLink[node] = Math.min(lowLink[node], 
lowLink[neighbor]);
-                       } else if (stack.contains(neighbor)) { //if neighbor 
has been already visited
-                               lowLink[node] = Math.min(lowLink[node], 
index[neighbor]);
-                               List<Integer> sccComponent = new 
ArrayList<Integer>();
-                               int w;
-                               do {
-                                       w = stack.pop();
-                                       sccComponent.add(w);
-                               } while (neighbor != w);
-                               //add neighbor again, just in case it is a 
member of another circle 
-                               stack.add(neighbor); 
-                               scc.add(sccComponent);
-                       }
-
-               }
-               if (lowLink[node] == index[node]) {
-                       int w;
-                       do {
-                               w = stack.pop();
-                       } while (node != w);
-               }
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
deleted file mode 100644
index fe1b960..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-
-import com.yahoo.labs.flink.topology.impl.SamoaType;
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.util.List;
-
-import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-
-public class Utils {
-
-       public static TypeInformation<SamoaType> tempTypeInfo = new 
TupleTypeInfo(SamoaType.class, STRING_TYPE_INFO, 
TypeExtractor.getForClass(ContentEvent.class), STRING_TYPE_INFO);
-
-       public static DataStream subscribe(DataStream<SamoaType> stream, 
PartitioningScheme partitioning) {
-               switch (partitioning) {
-                       case BROADCAST:
-                               return stream.broadcast();
-                       case GROUP_BY_KEY:
-                               return stream.groupBy(new 
KeySelector<SamoaType, String>() {
-                                       @Override
-                                       public String getKey(SamoaType 
samoaType) throws Exception {
-                                               return samoaType.f0;
-                                       }
-                               });
-                       case SHUFFLE:
-                       default:
-                               return stream.shuffle();
-               }
-       }
-
-       public static FilterFunction<SamoaType> getFilter(final String 
streamID) {
-               return new FilterFunction<SamoaType>() {
-                       @Override
-                       public boolean filter(SamoaType o) throws Exception {
-                               return o.f2.equals(streamID);
-                       }
-               };
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
deleted file mode 100644
index 70a7838..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package com.yahoo.labs.flink.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/**
- * Common interface of FlinkEntranceProcessingItem and FlinkProcessingItem
- */
-public interface FlinkComponent {
-
-       /**
-        * An initiation of the node. It should create the right invokables and 
apply the appropriate
-        * stream transformations
-        */
-       public void initialise();
-
-       /**
-        * This check is needed in order to determine whether all requirements 
for a Flink Component 
-        * (DataStream) are satisfied in order to initialise it. This is 
necessary in this integration
-        * since Flink Streaming applies eager datastream generation based on 
transformations.
-        * 
-        * @return 
-        */
-       public boolean canBeInitialised();
-
-       /**
-        * 
-        * @return
-        */
-       public boolean isInitialised();
-
-       /**
-        * The wrapped Flink DataStream generated by this Flink component. Mind 
that the component 
-        * should first be initialised in order to have a generated DataStream
-        * 
-        * @return
-        */
-       public DataStream<SamoaType> getOutStream();
-
-       /**
-        * A unique component id
-        * 
-        * @return
-        */
-       public int getComponentId();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
deleted file mode 100644
index fca0c1a..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.yahoo.labs.flink.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.*;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * An implementation of SAMOA's ComponentFactory for Apache Flink
- */
-public class FlinkComponentFactory implements ComponentFactory {
-
-       private StreamExecutionEnvironment env;
-
-       public FlinkComponentFactory(StreamExecutionEnvironment env) {
-               this.env = env;
-       }
-
-       @Override
-       public ProcessingItem createPi(Processor processor) {
-               return new FlinkProcessingItem(env, processor);
-       }
-
-       @Override
-       public ProcessingItem createPi(Processor processor, int parallelism) {
-               return new FlinkProcessingItem(env, processor, parallelism);
-       }
-
-       @Override
-       public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
-               return new FlinkEntranceProcessingItem(env, entranceProcessor);
-       }
-
-       @Override
-       public Stream createStream(IProcessingItem sourcePi) {
-               if (sourcePi instanceof FlinkProcessingItem)
-                       return ((FlinkProcessingItem) sourcePi).createStream();
-               else return new FlinkStream((FlinkComponent) sourcePi);
-       }
-
-       @Override
-       public Topology createTopology(String topologyName) {
-               return new FlinkTopology(topologyName, env);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
deleted file mode 100644
index 5dca509..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.yahoo.labs.flink.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem
-               implements FlinkComponent, Serializable {
-
-       private transient StreamExecutionEnvironment env;
-       private transient DataStream outStream;
-
-
-       public FlinkEntranceProcessingItem(StreamExecutionEnvironment env, 
EntranceProcessor proc) {
-               super(proc);
-               this.env = env;
-       }
-
-       @Override
-       public void initialise() {
-               final EntranceProcessor proc = getProcessor();
-               final String streamId = getOutputStream().getStreamId();
-               final int compID = getComponentId();
-
-               
-               outStream = env.addSource(new RichSourceFunction<SamoaType>() {
-                       volatile boolean canceled;
-                       EntranceProcessor entrProc = proc;
-                       String id = streamId;
-
-                       @Override
-                       public void open(Configuration parameters) throws 
Exception {
-                               super.open(parameters);
-                               entrProc.onCreate(compID);
-                       }
-
-                       @Override
-                       public void run(Collector<SamoaType> collector) throws 
Exception {
-                               while (!canceled && entrProc.hasNext()) {
-                                       
collector.collect(SamoaType.of(entrProc.nextEvent(), id));
-                               }
-                       }
-
-                       @Override
-                       public void cancel() {
-                               canceled = true;
-                       }
-               },Utils.tempTypeInfo);
-
-               ((FlinkStream) getOutputStream()).initialise();
-       }
-
-
-       @Override
-       public boolean canBeInitialised() {
-               return true;
-       }
-
-       @Override
-       public boolean isInitialised() {
-               return outStream != null;
-       }
-
-       @Override
-       public int getComponentId() {
-               return -1; // dummy number shows that it comes from an Entrance 
PI
-       }
-
-       @Override
-       public DataStream getOutStream() {
-               return outStream;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
deleted file mode 100644
index f92182e..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
+++ /dev/null
@@ -1,248 +0,0 @@
-package com.yahoo.labs.flink.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-import com.google.common.collect.Lists;
-import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-
-public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> 
implements ProcessingItem, FlinkComponent, Serializable {
-
-       private static final Logger logger = 
LoggerFactory.getLogger(FlinkProcessingItem.class);
-       public static final int MAX_WAIT_TIME_MILLIS = 10000;
-
-       private final Processor processor;
-       private final transient StreamExecutionEnvironment env;
-       private final SamoaDelegateFunction fun;
-       private transient DataStream<SamoaType> inStream;
-       private transient DataStream<SamoaType> outStream;
-       private transient List<FlinkStream> outputStreams = 
Lists.newArrayList();
-       private transient List<Tuple3<FlinkStream, PartitioningScheme, 
Integer>> inputStreams = Lists.newArrayList();
-       private int parallelism;
-       private static int numberOfPIs = 0;
-       private int piID;
-       private List<Integer> circleId; //check if we can refactor this
-       private boolean onIteration;
-       //private int circleId; //check if we can refactor this
-
-       public FlinkProcessingItem(StreamExecutionEnvironment env, Processor 
proc) {
-               this(env, proc, 1);
-       }
-
-       public FlinkProcessingItem(StreamExecutionEnvironment env, Processor 
proc, int parallelism) {
-               this(env, new SamoaDelegateFunction(proc), proc, parallelism);
-       }
-
-       public FlinkProcessingItem(StreamExecutionEnvironment env, 
SamoaDelegateFunction fun, Processor proc, int parallelism) {
-               super(fun);
-               this.env = env;
-               this.fun = fun;
-               this.processor = proc;
-               this.parallelism = parallelism;
-               this.piID = numberOfPIs++;
-               this.circleId = new ArrayList<Integer>() {
-               }; // if size equals 0, then it is part of no circle
-       }
-
-       public Stream createStream() {
-               FlinkStream generatedStream = new FlinkStream(this);
-               outputStreams.add(generatedStream);
-               return generatedStream;
-       }
-
-       public void putToStream(ContentEvent data, Stream targetStream) {
-               collector.collect(SamoaType.of(data, 
targetStream.getStreamId()));
-       }
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               this.processor.onCreate(getComponentId());
-       }
-
-       @Override
-       public void initialise() {
-               for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : inputStreams) {
-                       if (inputStream.f0.isInitialised()) { //if input stream 
is initialised
-                               try {
-                                       DataStream toBeMerged = 
Utils.subscribe(inputStream.f0.getOutStream(), inputStream.f1);
-                                       if (inStream == null) {
-                                               inStream = toBeMerged;
-                                       } else {
-                                               inStream = 
inStream.merge(toBeMerged);
-                                       }
-                               } catch (RuntimeException e) {
-                                       e.printStackTrace();
-                                       System.exit(1);
-                               }
-                       }
-               }
-
-               if (onIteration) {
-                       inStream = inStream.iterate(MAX_WAIT_TIME_MILLIS);
-               }
-               outStream = inStream.transform("samoaProcessor", 
Utils.tempTypeInfo, this).setParallelism(parallelism);
-       }
-
-       public void initialiseStreams() {
-               for (FlinkStream stream : this.getOutputStreams()) {
-                       stream.initialise();
-               }
-       }
-
-       @Override
-       public boolean canBeInitialised() {
-               for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : inputStreams) {
-                       if (!inputStream.f0.isInitialised()) return false;
-               }
-               return true;
-       }
-
-       @Override
-       public boolean isInitialised() {
-               return outStream != null;
-       }
-
-       @Override
-       public Processor getProcessor() {
-               return processor;
-       }
-
-       @Override
-       public void invoke() throws Exception {
-               while (readNext() != null) {
-                       SamoaType t = nextObject;
-                       fun.processEvent(t.f1);
-               }
-       }
-
-       @Override
-       public ProcessingItem connectInputShuffleStream(Stream inputStream) {
-               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.SHUFFLE, ((FlinkStream) inputStream).getSourcePiId()));
-               return this;
-       }
-
-       @Override
-       public ProcessingItem connectInputKeyStream(Stream inputStream) {
-               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.GROUP_BY_KEY, ((FlinkStream) inputStream).getSourcePiId()));
-               return this;
-       }
-
-       @Override
-       public ProcessingItem connectInputAllStream(Stream inputStream) {
-               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.BROADCAST, ((FlinkStream) inputStream).getSourcePiId()));
-               return this;
-       }
-
-       @Override
-       public int getParallelism() {
-               return parallelism;
-       }
-
-       public void setParallelism(int parallelism) {
-               this.parallelism = parallelism;
-       }
-
-       public List<FlinkStream> getOutputStreams() {
-               return outputStreams;
-       }
-
-       public DataStream<SamoaType> getOutStream() {
-               return this.outStream;
-       }
-
-       public void setOutStream(DataStream outStream) {
-               this.outStream = outStream;
-       }
-
-       @Override
-       public int getComponentId() {
-               return piID;
-       }
-
-       public boolean isPartOfCircle() {
-               return this.circleId.size() > 0;
-       }
-
-       public List<Integer> getCircleIds() {
-               return circleId;
-       }
-
-       public void addPItoLoop(int piId) {
-               this.circleId.add(piId);
-       }
-
-       public DataStream<SamoaType> getInStream() {
-               return inStream;
-       }
-
-       public List<Tuple3<FlinkStream, PartitioningScheme, Integer>> 
getInputStreams() {
-               return inputStreams;
-       }
-
-       public void setOnIteration(boolean onIteration) {
-               this.onIteration = onIteration;
-       }
-
-       public boolean isOnIteration() {
-               return onIteration;
-       }
-
-       static class SamoaDelegateFunction implements Function, Serializable {
-               private final Processor proc;
-
-               SamoaDelegateFunction(Processor proc) {
-                       this.proc = proc;
-               }
-
-               public void processEvent(ContentEvent event) {
-                       proc.process(event);
-               }
-       }
-
-       public FlinkStream getInputStreamBySourceID(int sourceID) {
-               for (Tuple3<FlinkStream, PartitioningScheme, Integer> fstreams 
: inputStreams) {
-                       if (fstreams.f2 == sourceID) {
-                               return fstreams.f0;
-                       }
-               }
-               return null;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
deleted file mode 100644
index c5cb0ed..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.yahoo.labs.flink.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-
-import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.topology.AbstractStream;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-import java.io.Serializable;
-
-
-/**
- * A stream for SAMOA based on Apache Flink's DataStream
- */
-public class FlinkStream extends AbstractStream implements FlinkComponent, 
Serializable {
-
-       private static int outputCounter = 0;
-       private FlinkComponent procItem;
-       private transient DataStream<SamoaType> dataStream;
-       private int sourcePiId;
-       private String flinkStreamId;
-
-       public FlinkStream(FlinkComponent sourcePi) {
-               this.procItem = sourcePi;
-               this.sourcePiId = sourcePi.getComponentId();
-               setStreamId("stream-" + Integer.toString(outputCounter));
-               flinkStreamId = "stream-" + Integer.toString(outputCounter);
-               outputCounter++;
-       }
-
-       @Override
-       public void initialise() {
-               if (procItem instanceof FlinkProcessingItem) {
-                       dataStream = 
procItem.getOutStream().filter(Utils.getFilter(getStreamId()))
-                       .setParallelism(((FlinkProcessingItem) 
procItem).getParallelism());
-               } else
-                       dataStream = procItem.getOutStream();
-       }
-
-       @Override
-       public boolean canBeInitialised() {
-               return procItem.isInitialised();
-       }
-
-       @Override
-       public boolean isInitialised() {
-               return dataStream != null;
-       }
-
-       @Override
-       public DataStream getOutStream() {
-               return dataStream;
-       }
-
-       @Override
-       public void put(ContentEvent event) {
-               ((FlinkProcessingItem) procItem).putToStream(event, this);
-       }
-
-       @Override
-       public int getComponentId() {
-               return -1; //dummy number shows that it comes from a Stream
-       }
-
-       public int getSourcePiId() {
-               return sourcePiId;
-       }
-
-       @Override
-       public String getStreamId() {
-               return flinkStreamId;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
deleted file mode 100644
index f04d792..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package com.yahoo.labs.flink.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
-import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
-import com.yahoo.labs.samoa.topology.AbstractTopology;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A SAMOA topology on Apache Flink
- * 
- * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated 
within custom operators.
- * Streams are tagged and filtered in each operator's output so they can be 
routed to the right 
- * operator respectively. Building a Flink topology from a Samoa task involves 
invoking all these
- * stream transformations and finally, marking and initiating loops in the 
graph. We have to do that
- * since Flink only allows explicit loops in the topology started with 
'iterate()' and closed with
- * 'closeWith()'. Thus, when we build a flink topology we have to do it 
incrementally from the 
- * sources, mark loops and initialize them with explicit iterations.
- * 
- */
-public class FlinkTopology extends AbstractTopology {
-
-       private static final Logger logger = 
LoggerFactory.getLogger(FlinkTopology.class);
-       public static StreamExecutionEnvironment env;
-       public List<List<FlinkProcessingItem>> topologyLoops = new 
ArrayList<>();
-       public  List<Integer> backEdges = new ArrayList<Integer>();
-
-       public FlinkTopology(String name, StreamExecutionEnvironment env) {
-               super(name);
-               this.env = env;
-       }
-
-       public StreamExecutionEnvironment getEnvironment() {
-               return env;
-       }
-       
-       public void build() {
-               markCircles();
-               for (EntranceProcessingItem src : getEntranceProcessingItems()) 
{
-                       ((FlinkEntranceProcessingItem) src).initialise();
-               }
-               
initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), 
FlinkProcessingItem.class)));
-       }
-
-       private void initComponents(ImmutableList<FlinkProcessingItem> 
flinkComponents) {
-               if (flinkComponents.isEmpty()) return;
-
-               for (FlinkProcessingItem comp : flinkComponents) {
-                       if (comp.canBeInitialised() && !comp.isInitialised() && 
!comp.isPartOfCircle()) {
-                               comp.initialise();
-                               comp.initialiseStreams();
-
-                       }//if component is part of one or more circle
-                       else if (comp.isPartOfCircle() && 
!comp.isInitialised()) {
-                               for (Integer circle : comp.getCircleIds()) {
-                                       //check if circle can be initialized
-                                       if (checkCircleReady(circle)) {
-                                               logger.debug("Circle: " + 
circle + " can be initialised");
-                                               initialiseCircle(circle);
-                                       } else {
-                                               logger.debug("Circle cannot be 
initialised");
-                                       }
-                               }
-                       }
-
-               }
-               
initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new 
Predicate<FlinkProcessingItem>() {
-                       @Override
-                       public boolean apply(FlinkProcessingItem 
flinkComponent) {
-                               return !flinkComponent.isInitialised();
-                       }
-               })));
-       }
-
-       private void markCircles(){
-               List<FlinkProcessingItem> pis = 
Lists.newArrayList(Iterables.filter(getProcessingItems(), 
FlinkProcessingItem.class));
-               List<Integer>[] graph = new List[pis.size()];
-               FlinkProcessingItem[] processingItems = new 
FlinkProcessingItem[pis.size()];
-
-
-               for (int i=0;i<pis.size();i++) {
-                       graph[i] = new ArrayList<Integer>();
-               }
-               //construct the graph of the topology for the Processing Items 
(No entrance pi is included)
-               for (FlinkProcessingItem pi: pis) {
-                       processingItems[pi.getComponentId()] = pi;
-                       for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
is : pi.getInputStreams()) {
-                               if (is.f2 != -1) 
graph[is.f2].add(pi.getComponentId());
-                       }
-               }
-               for (int g=0;g<graph.length;g++)
-                       logger.debug(graph[g].toString());
-
-               CircleDetection detCircles = new CircleDetection();
-               List<List<Integer>> circles = detCircles.getCircles(graph);
-
-               //update PIs, regarding being part of a circle.
-               for (List<Integer> c : circles){
-                       List<FlinkProcessingItem> circle = new ArrayList<>();
-                       for (Integer it : c){
-                               circle.add(processingItems[it]);
-                               
processingItems[it].addPItoLoop(topologyLoops.size());
-                       }
-                       topologyLoops.add(circle);
-                       backEdges.add(circle.get(0).getComponentId());
-               }
-               logger.debug("Circles detected in the topology: " + circles);
-       }
-       
-
-       private boolean checkCircleReady(int circleId) {
-
-               List<Integer> circleIds = new ArrayList<>();
-
-               for (FlinkProcessingItem pi : topologyLoops.get(circleId)) {
-                       circleIds.add(pi.getComponentId());
-               }
-               //check that all incoming to the circle streams are initialised
-               for (FlinkProcessingItem procItem : 
topologyLoops.get(circleId)) {
-                       for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : procItem.getInputStreams()) {
-                               //if a inputStream is not initialized AND 
source of inputStream is not in the circle or a tail of other circle
-                               if ((!inputStream.f0.isInitialised()) && 
(!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
-                                       return false;
-                       }
-               }
-               return true;
-       }
-
-       private void initialiseCircle(int circleId) {
-               //get the head and tail of circle
-               FlinkProcessingItem tail = topologyLoops.get(circleId).get(0);
-               FlinkProcessingItem head = 
topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1);
-
-               //initialise source stream of the iteration, so as to use it 
for the iteration starting point
-               if (!head.isInitialised()) {
-                       head.setOnIteration(true);
-                       head.initialise();
-                       head.initialiseStreams();
-               }
-
-               //initialise all nodes after head
-               for (int node = topologyLoops.get(circleId).size() - 2; node >= 
0; node--) {
-                       topologyLoops.get(circleId).get(node).initialise();
-                       
topologyLoops.get(circleId).get(node).initialiseStreams();
-               }
-
-               ((IterativeDataStream) 
head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream());
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java 
b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
deleted file mode 100644
index 16d050a..0000000
--- 
a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package com.yahoo.labs.flink.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-
-
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-public class SamoaType extends Tuple3<String, ContentEvent, String> {
-       public SamoaType() {
-               super();
-       }
-
-       private SamoaType(String key, ContentEvent event, String streamId) {
-               super(key, event, streamId);
-       }
-
-       public static SamoaType of(ContentEvent event, String streamId) {
-               String key = event.getKey() == null ? "none" : event.getKey();
-               return new SamoaType(key, event, streamId);
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java 
b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
new file mode 100644
index 0000000..cd0b82c
--- /dev/null
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
@@ -0,0 +1,88 @@
+package org.apache.samoa.flink;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.ClassOption;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.samoa.flink.helpers.CircleDetection;
+import org.apache.samoa.flink.helpers.Utils;
+import org.apache.samoa.flink.topology.impl.FlinkComponentFactory;
+import org.apache.samoa.flink.topology.impl.FlinkProcessingItem;
+import org.apache.samoa.flink.topology.impl.FlinkStream;
+import org.apache.samoa.flink.topology.impl.FlinkTopology;
+import org.apache.samoa.tasks.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+/**
+ * Main class to run a SAMOA on Apache Flink
+ */
+public class FlinkDoTask {
+
+       private static final Logger logger = 
LoggerFactory.getLogger(FlinkDoTask.class);
+
+
+       public static void main(String[] args) throws Exception {
+               List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
+
+               args = tmpArgs.toArray(new String[0]);
+
+               // Init Task
+               StringBuilder cliString = new StringBuilder();
+               for (int i = 0; i < args.length; i++) {
+                       cliString.append(" ").append(args[i]);
+               }
+               logger.debug("Command line string = {}", cliString.toString());
+               System.out.println("Command line string = " + 
cliString.toString());
+
+               Task task;
+               try {
+                       task = 
ClassOption.cliStringToObject(cliString.toString(), Task.class, null);
+                       logger.debug("Successfully instantiating {}", 
task.getClass().getCanonicalName());
+               } catch (Exception e) {
+                       logger.error("Failed to initialize the task: ", e);
+                       System.out.println("Failed to initialize the task: " + 
e);
+                       return;
+               }
+               
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               task.setFactory(new FlinkComponentFactory(env));
+               task.init();
+               
+               logger.debug("Building Flink topology...");
+               ((FlinkTopology) task.getTopology()).build();
+               
+               logger.debug("Submitting the job...");
+               env.execute();
+
+       }
+
+
+       
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java 
b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
new file mode 100644
index 0000000..a5a3b9d
--- /dev/null
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
@@ -0,0 +1,99 @@
+package org.apache.samoa.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * This class contains all logic needed in order to mark circles in job graphs 
explicitly such as 
+ * in the case of Apache Flink. A circle is defined as a list of node ids 
ordered in topological 
+ * (DFS) order.
+ * 
+ */
+public class CircleDetection {
+       private int[] index;
+       private int[] lowLink;
+       private int counter;
+       private Stack<Integer> stack;
+       private List<List<Integer>> scc;
+       List<Integer>[] graph;
+
+
+       public CircleDetection() {
+               stack = new Stack<Integer>();
+               scc = new ArrayList<>();
+       }
+
+       public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) {
+               graph = adjacencyList;
+               index = new int[adjacencyList.length];
+               lowLink = new int[adjacencyList.length];
+               counter = 0;
+
+               //initialize index and lowLink as "undefined"(=-1)
+               for (int j = 0; j < graph.length; j++) {
+                       index[j] = -1;
+                       lowLink[j] = -1;
+               }
+               for (int v = 0; v < graph.length; v++) {
+                       if (index[v] == -1) { //undefined.
+                               findSCC(v);
+                       }
+               }
+               return scc;
+       }
+
+       private void findSCC(int node) {
+               index[node] = counter;
+               lowLink[node] = counter;
+               counter++;
+               stack.push(node);
+
+               for (int neighbor : graph[node]) {
+                       if (index[neighbor] == -1) {
+                               findSCC(neighbor);
+                               lowLink[node] = Math.min(lowLink[node], 
lowLink[neighbor]);
+                       } else if (stack.contains(neighbor)) { //if neighbor 
has been already visited
+                               lowLink[node] = Math.min(lowLink[node], 
index[neighbor]);
+                               List<Integer> sccComponent = new 
ArrayList<Integer>();
+                               int w;
+                               do {
+                                       w = stack.pop();
+                                       sccComponent.add(w);
+                               } while (neighbor != w);
+                               //add neighbor again, just in case it is a 
member of another circle 
+                               stack.add(neighbor); 
+                               scc.add(sccComponent);
+                       }
+
+               }
+               if (lowLink[node] == index[node]) {
+                       int w;
+                       do {
+                               w = stack.pop();
+                       } while (node != w);
+               }
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java 
b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
new file mode 100644
index 0000000..38b4bdc
--- /dev/null
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
@@ -0,0 +1,69 @@
+package org.apache.samoa.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.flink.topology.impl.SamoaType;
+import org.apache.samoa.utils.PartitioningScheme;
+
+import java.util.List;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+
+public class Utils {
+
+       public static TypeInformation<SamoaType> tempTypeInfo = new 
TupleTypeInfo(SamoaType.class, STRING_TYPE_INFO, 
TypeExtractor.getForClass(ContentEvent.class), STRING_TYPE_INFO);
+
+       public static DataStream subscribe(DataStream<SamoaType> stream, 
PartitioningScheme partitioning) {
+               switch (partitioning) {
+                       case BROADCAST:
+                               return stream.broadcast();
+                       case GROUP_BY_KEY:
+                               return stream.groupBy(new 
KeySelector<SamoaType, String>() {
+                                       @Override
+                                       public String getKey(SamoaType 
samoaType) throws Exception {
+                                               return samoaType.f0;
+                                       }
+                               });
+                       case SHUFFLE:
+                       default:
+                               return stream.shuffle();
+               }
+       }
+
+       public static FilterFunction<SamoaType> getFilter(final String 
streamID) {
+               return new FilterFunction<SamoaType>() {
+                       @Override
+                       public boolean filter(SamoaType o) throws Exception {
+                               return o.f2.equals(streamID);
+                       }
+               };
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponent.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponent.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponent.java
new file mode 100644
index 0000000..b61f590
--- /dev/null
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponent.java
@@ -0,0 +1,68 @@
+package org.apache.samoa.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Common interface of FlinkEntranceProcessingItem and FlinkProcessingItem
+ */
+public interface FlinkComponent {
+
+       /**
+        * An initiation of the node. It should create the right invokables and 
apply the appropriate
+        * stream transformations
+        */
+       public void initialise();
+
+       /**
+        * This check is needed in order to determine whether all requirements 
for a Flink Component 
+        * (DataStream) are satisfied in order to initialise it. This is 
necessary in this integration
+        * since Flink Streaming applies eager datastream generation based on 
transformations.
+        * 
+        * @return 
+        */
+       public boolean canBeInitialised();
+
+       /**
+        * 
+        * @return
+        */
+       public boolean isInitialised();
+
+       /**
+        * The wrapped Flink DataStream generated by this Flink component. Mind 
that the component 
+        * should first be initialised in order to have a generated DataStream
+        * 
+        * @return
+        */
+       public DataStream<SamoaType> getOutStream();
+
+       /**
+        * A unique component id
+        * 
+        * @return
+        */
+       public int getComponentId();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponentFactory.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponentFactory.java
new file mode 100644
index 0000000..93e4626
--- /dev/null
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkComponentFactory.java
@@ -0,0 +1,66 @@
+package org.apache.samoa.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.*;
+
+/**
+ * An implementation of SAMOA's ComponentFactory for Apache Flink
+ */
+public class FlinkComponentFactory implements ComponentFactory {
+
+       private StreamExecutionEnvironment env;
+
+       public FlinkComponentFactory(StreamExecutionEnvironment env) {
+               this.env = env;
+       }
+
+       @Override
+       public ProcessingItem createPi(Processor processor) {
+               return new FlinkProcessingItem(env, processor);
+       }
+
+       @Override
+       public ProcessingItem createPi(Processor processor, int parallelism) {
+               return new FlinkProcessingItem(env, processor, parallelism);
+       }
+
+       @Override
+       public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+               return new FlinkEntranceProcessingItem(env, entranceProcessor);
+       }
+
+       @Override
+       public Stream createStream(IProcessingItem sourcePi) {
+               if (sourcePi instanceof FlinkProcessingItem)
+                       return ((FlinkProcessingItem) sourcePi).createStream();
+               else return new FlinkStream((FlinkComponent) sourcePi);
+       }
+
+       @Override
+       public Topology createTopology(String topologyName) {
+               return new FlinkTopology(topologyName, env);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
new file mode 100644
index 0000000..e00874b
--- /dev/null
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
@@ -0,0 +1,101 @@
+package org.apache.samoa.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.flink.helpers.Utils;
+import org.apache.samoa.topology.AbstractEntranceProcessingItem;
+
+import java.io.Serializable;
+
+public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem
+               implements FlinkComponent, Serializable {
+
+       private transient StreamExecutionEnvironment env;
+       private transient DataStream outStream;
+
+
+       public FlinkEntranceProcessingItem(StreamExecutionEnvironment env, 
EntranceProcessor proc) {
+               super(proc);
+               this.env = env;
+       }
+
+       @Override
+       public void initialise() {
+               final EntranceProcessor proc = getProcessor();
+               final String streamId = getOutputStream().getStreamId();
+               final int compID = getComponentId();
+
+               
+               outStream = env.addSource(new RichSourceFunction<SamoaType>() {
+                       volatile boolean canceled;
+                       EntranceProcessor entrProc = proc;
+                       String id = streamId;
+
+                       @Override
+                       public void open(Configuration parameters) throws 
Exception {
+                               super.open(parameters);
+                               entrProc.onCreate(compID);
+                       }
+
+                       @Override
+                       public void run(Collector<SamoaType> collector) throws 
Exception {
+                               while (!canceled && entrProc.hasNext()) {
+                                       
collector.collect(SamoaType.of(entrProc.nextEvent(), id));
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+                               canceled = true;
+                       }
+               },Utils.tempTypeInfo);
+
+               ((FlinkStream) getOutputStream()).initialise();
+       }
+
+
+       @Override
+       public boolean canBeInitialised() {
+               return true;
+       }
+
+       @Override
+       public boolean isInitialised() {
+               return outStream != null;
+       }
+
+       @Override
+       public int getComponentId() {
+               return -1; // dummy number shows that it comes from an Entrance 
PI
+       }
+
+       @Override
+       public DataStream getOutStream() {
+               return outStream;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
new file mode 100644
index 0000000..3f5431c
--- /dev/null
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
@@ -0,0 +1,249 @@
+package org.apache.samoa.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.flink.helpers.Utils;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> 
implements ProcessingItem, FlinkComponent, Serializable {
+
+       private static final Logger logger = 
LoggerFactory.getLogger(FlinkProcessingItem.class);
+       public static final int MAX_WAIT_TIME_MILLIS = 10000;
+
+       private final Processor processor;
+       private final transient StreamExecutionEnvironment env;
+       private final SamoaDelegateFunction fun;
+       private transient DataStream<SamoaType> inStream;
+       private transient DataStream<SamoaType> outStream;
+       private transient List<FlinkStream> outputStreams = 
Lists.newArrayList();
+       private transient List<Tuple3<FlinkStream, PartitioningScheme, 
Integer>> inputStreams = Lists.newArrayList();
+       private int parallelism;
+       private static int numberOfPIs = 0;
+       private int piID;
+       private List<Integer> circleId; //check if we can refactor this
+       private boolean onIteration;
+       //private int circleId; //check if we can refactor this
+
+       public FlinkProcessingItem(StreamExecutionEnvironment env, Processor 
proc) {
+               this(env, proc, 1);
+       }
+
+       public FlinkProcessingItem(StreamExecutionEnvironment env, Processor 
proc, int parallelism) {
+               this(env, new SamoaDelegateFunction(proc), proc, parallelism);
+       }
+
+       public FlinkProcessingItem(StreamExecutionEnvironment env, 
SamoaDelegateFunction fun, Processor proc, int parallelism) {
+               super(fun);
+               this.env = env;
+               this.fun = fun;
+               this.processor = proc;
+               this.parallelism = parallelism;
+               this.piID = numberOfPIs++;
+               this.circleId = new ArrayList<Integer>() {
+               }; // if size equals 0, then it is part of no circle
+       }
+
+       public Stream createStream() {
+               FlinkStream generatedStream = new FlinkStream(this);
+               outputStreams.add(generatedStream);
+               return generatedStream;
+       }
+
+       public void putToStream(ContentEvent data, Stream targetStream) {
+               collector.collect(SamoaType.of(data, 
targetStream.getStreamId()));
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.processor.onCreate(getComponentId());
+       }
+
+       @Override
+       public void initialise() {
+               for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : inputStreams) {
+                       if (inputStream.f0.isInitialised()) { //if input stream 
is initialised
+                               try {
+                                       DataStream toBeMerged = 
Utils.subscribe(inputStream.f0.getOutStream(), inputStream.f1);
+                                       if (inStream == null) {
+                                               inStream = toBeMerged;
+                                       } else {
+                                               inStream = 
inStream.merge(toBeMerged);
+                                       }
+                               } catch (RuntimeException e) {
+                                       e.printStackTrace();
+                                       System.exit(1);
+                               }
+                       }
+               }
+
+               if (onIteration) {
+                       inStream = inStream.iterate(MAX_WAIT_TIME_MILLIS);
+               }
+               outStream = inStream.transform("samoaProcessor", 
Utils.tempTypeInfo, this).setParallelism(parallelism);
+       }
+
+       public void initialiseStreams() {
+               for (FlinkStream stream : this.getOutputStreams()) {
+                       stream.initialise();
+               }
+       }
+
+       @Override
+       public boolean canBeInitialised() {
+               for (Tuple3<FlinkStream, PartitioningScheme, Integer> 
inputStream : inputStreams) {
+                       if (!inputStream.f0.isInitialised()) return false;
+               }
+               return true;
+       }
+
+       @Override
+       public boolean isInitialised() {
+               return outStream != null;
+       }
+
+       @Override
+       public Processor getProcessor() {
+               return processor;
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       SamoaType t = nextObject;
+                       fun.processEvent(t.f1);
+               }
+       }
+
+       @Override
+       public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.SHUFFLE, ((FlinkStream) inputStream).getSourcePiId()));
+               return this;
+       }
+
+       @Override
+       public ProcessingItem connectInputKeyStream(Stream inputStream) {
+               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.GROUP_BY_KEY, ((FlinkStream) inputStream).getSourcePiId()));
+               return this;
+       }
+
+       @Override
+       public ProcessingItem connectInputAllStream(Stream inputStream) {
+               inputStreams.add(new Tuple3<>((FlinkStream) inputStream, 
PartitioningScheme.BROADCAST, ((FlinkStream) inputStream).getSourcePiId()));
+               return this;
+       }
+
+       @Override
+       public int getParallelism() {
+               return parallelism;
+       }
+
+       public void setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+       }
+
+       public List<FlinkStream> getOutputStreams() {
+               return outputStreams;
+       }
+
+       public DataStream<SamoaType> getOutStream() {
+               return this.outStream;
+       }
+
+       public void setOutStream(DataStream outStream) {
+               this.outStream = outStream;
+       }
+
+       @Override
+       public int getComponentId() {
+               return piID;
+       }
+
+       public boolean isPartOfCircle() {
+               return this.circleId.size() > 0;
+       }
+
+       public List<Integer> getCircleIds() {
+               return circleId;
+       }
+
+       public void addPItoLoop(int piId) {
+               this.circleId.add(piId);
+       }
+
+       public DataStream<SamoaType> getInStream() {
+               return inStream;
+       }
+
+       public List<Tuple3<FlinkStream, PartitioningScheme, Integer>> 
getInputStreams() {
+               return inputStreams;
+       }
+
+       public void setOnIteration(boolean onIteration) {
+               this.onIteration = onIteration;
+       }
+
+       public boolean isOnIteration() {
+               return onIteration;
+       }
+
+       static class SamoaDelegateFunction implements Function, Serializable {
+               private final Processor proc;
+
+               SamoaDelegateFunction(Processor proc) {
+                       this.proc = proc;
+               }
+
+               public void processEvent(ContentEvent event) {
+                       proc.process(event);
+               }
+       }
+
+       public FlinkStream getInputStreamBySourceID(int sourceID) {
+               for (Tuple3<FlinkStream, PartitioningScheme, Integer> fstreams 
: inputStreams) {
+                       if (fstreams.f2 == sourceID) {
+                               return fstreams.f0;
+                       }
+               }
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
new file mode 100644
index 0000000..31617a7
--- /dev/null
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
@@ -0,0 +1,94 @@
+package org.apache.samoa.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.flink.helpers.Utils;
+import org.apache.samoa.topology.AbstractStream;
+
+import java.io.Serializable;
+
+
+/**
+ * A stream for SAMOA based on Apache Flink's DataStream
+ */
+public class FlinkStream extends AbstractStream implements FlinkComponent, 
Serializable {
+
+       private static int outputCounter = 0;
+       private FlinkComponent procItem;
+       private transient DataStream<SamoaType> dataStream;
+       private int sourcePiId;
+       private String flinkStreamId;
+
+       public FlinkStream(FlinkComponent sourcePi) {
+               this.procItem = sourcePi;
+               this.sourcePiId = sourcePi.getComponentId();
+               setStreamId("stream-" + Integer.toString(outputCounter));
+               flinkStreamId = "stream-" + Integer.toString(outputCounter);
+               outputCounter++;
+       }
+
+       @Override
+       public void initialise() {
+               if (procItem instanceof FlinkProcessingItem) {
+                       dataStream = 
procItem.getOutStream().filter(Utils.getFilter(getStreamId()))
+                       .setParallelism(((FlinkProcessingItem) 
procItem).getParallelism());
+               } else
+                       dataStream = procItem.getOutStream();
+       }
+
+       @Override
+       public boolean canBeInitialised() {
+               return procItem.isInitialised();
+       }
+
+       @Override
+       public boolean isInitialised() {
+               return dataStream != null;
+       }
+
+       @Override
+       public DataStream getOutStream() {
+               return dataStream;
+       }
+
+       @Override
+       public void put(ContentEvent event) {
+               ((FlinkProcessingItem) procItem).putToStream(event, this);
+       }
+
+       @Override
+       public int getComponentId() {
+               return -1; //dummy number shows that it comes from a Stream
+       }
+
+       public int getSourcePiId() {
+               return sourcePiId;
+       }
+
+       @Override
+       public String getStreamId() {
+               return flinkStreamId;
+       }
+}


Reply via email to