http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 5230e9b..8abf9d6 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -19,31 +19,36 @@ package org.apache.flink.streaming.examples.join; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; import java.util.Random; +import java.util.concurrent.TimeUnit; /** * Example illustrating join over sliding windows of streams in Flink. - * <p/> + * * <p> - * his example will join two streams with a sliding window. One which emits - * grades and one which emits salaries of people. - * </p> - * <p/> - * <p/> + * This example will join two streams with a sliding window. One which emits grades and one which + * emits salaries of people. The input format for both sources has an additional timestamp + * as field 0. This is used to to event-time windowing. Time timestamps must be + * monotonically increasing. + * * This example shows how to: * <ul> - * <li>do windowed joins, - * <li>use tuple data types, - * <li>write a simple streaming program. + * <li>do windowed joins, + * <li>use tuple data types, + * <li>write a simple streaming program. + * </ul> */ public class WindowJoin { @@ -51,9 +56,6 @@ public class WindowJoin { // PROGRAM // ************************************************************************* - private static DataStream<Tuple2<String, Integer>> grades; - private static DataStream<Tuple2<String, Integer>> salaries; - public static void main(String[] args) throws Exception { if (!parseParameters(args)) { @@ -62,18 +64,25 @@ public class WindowJoin { // obtain execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableTimestamps(); // connect to the data sources for grades and salaries - setInputStreams(env); + Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env); + DataStream<Tuple3<Long, String, Integer>> grades = input.f0; + DataStream<Tuple3<Long, String, Integer>> salaries = input.f1; + + // extract the timestamps + grades = grades.extractTimestamp(new MyTimestampExtractor()); + salaries = salaries.extractTimestamp(new MyTimestampExtractor()); // apply a temporal join over the two stream based on the names over one // second windows DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades .join(salaries) - .onWindow(1, new MyTimestamp(0), new MyTimestamp(0)) - .where(0) - .equalTo(0) - .with(new MyJoinFunction()); + .where(new NameKeySelector()) + .equalTo(new NameKeySelector()) + .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS))) + .apply(new MyJoinFunction()); // emit result if (fileOutput) { @@ -98,24 +107,25 @@ public class WindowJoin { /** * Continuously emit tuples with random names and integers (grades). */ - public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> { + public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> { private static final long serialVersionUID = 1L; private Random rand; - private Tuple2<String, Integer> outTuple; + private Tuple3<Long, String, Integer> outTuple; private volatile boolean isRunning = true; private int counter; public GradeSource() { rand = new Random(); - outTuple = new Tuple2<String, Integer>(); + outTuple = new Tuple3<>(); } @Override - public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception { while (isRunning && counter < 100) { - outTuple.f0 = names[rand.nextInt(names.length)]; - outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1; + outTuple.f0 = System.currentTimeMillis(); + outTuple.f1 = names[rand.nextInt(names.length)]; + outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1; Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); counter++; ctx.collect(outTuple); @@ -131,27 +141,28 @@ public class WindowJoin { /** * Continuously emit tuples with random names and integers (salaries). */ - public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> { + public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> { private static final long serialVersionUID = 1L; private transient Random rand; - private transient Tuple2<String, Integer> outTuple; + private transient Tuple3<Long, String, Integer> outTuple; private volatile boolean isRunning; private int counter; public void open(Configuration parameters) throws Exception { super.open(parameters); rand = new Random(); - outTuple = new Tuple2<String, Integer>(); + outTuple = new Tuple3<Long, String, Integer>(); isRunning = true; } @Override - public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception { while (isRunning && counter < 100) { - outTuple.f0 = names[rand.nextInt(names.length)]; - outTuple.f1 = rand.nextInt(SALARY_MAX) + 1; + outTuple.f0 = System.currentTimeMillis(); + outTuple.f1 = names[rand.nextInt(names.length)]; + outTuple.f2 = rand.nextInt(SALARY_MAX) + 1; Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); counter++; ctx.collect(outTuple); @@ -164,7 +175,7 @@ public class WindowJoin { } } - public static class MySourceMap extends RichMapFunction<String, Tuple2<String, Integer>> { + public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> { private static final long serialVersionUID = 1L; @@ -175,44 +186,55 @@ public class WindowJoin { } @Override - public Tuple2<String, Integer> map(String line) throws Exception { + public Tuple3<Long, String, Integer> map(String line) throws Exception { record = line.substring(1, line.length() - 1).split(","); - return new Tuple2<String, Integer>(record[0], Integer.parseInt(record[1])); + return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2])); } } public static class MyJoinFunction implements - JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> { + JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> { private static final long serialVersionUID = 1L; - private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer, Integer>(); + private Tuple3<String, Integer, Integer> joined = new Tuple3<>(); @Override - public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, - Tuple2<String, Integer> second) throws Exception { - joined.f0 = first.f0; - joined.f1 = first.f1; - joined.f2 = second.f1; + public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first, + Tuple3<Long, String, Integer> second) throws Exception { + joined.f0 = first.f1; + joined.f1 = first.f2; + joined.f2 = second.f2; return joined; } } - public static class MyTimestamp implements Timestamp<Tuple2<String, Integer>> { - + private static class MyTimestampExtractor implements TimestampExtractor<Tuple3<Long, String, Integer>> { private static final long serialVersionUID = 1L; - private int counter; + @Override + public long extractTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) { + return element.f0; + } - public MyTimestamp(int starttime) { - this.counter = starttime; + @Override + public long emitWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) { + return element.f0 - 1; } @Override - public long getTimestamp(Tuple2<String, Integer> value) { - counter += SLEEP_TIME; - return counter; + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + } + + private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple3<Long, String, Integer> value) throws Exception { + return value.f1; } } @@ -253,7 +275,12 @@ public class WindowJoin { return true; } - private static void setInputStreams(StreamExecutionEnvironment env) { + private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams( + StreamExecutionEnvironment env) { + + DataStream<Tuple3<Long, String, Integer>> grades; + DataStream<Tuple3<Long, String, Integer>> salaries; + if (fileInput) { grades = env.readTextFile(gradesPath).map(new MySourceMap()); salaries = env.readTextFile(salariesPath).map(new MySourceMap()); @@ -261,5 +288,8 @@ public class WindowJoin { grades = env.addSource(new GradeSource()); salaries = env.addSource(new SalarySource()); } + + return Tuple2.of(grades, salaries); } + }
http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java index 23d29b1..15c1280 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java @@ -19,42 +19,42 @@ package org.apache.flink.streaming.examples.join.util; public class WindowJoinData { - public static final String GRADES_INPUT = "(john,5)\n" + "(tom,3)\n" + "(alice,1)\n" + "(grace,5)\n" + - "(john,4)\n" + "(bob,1)\n" + "(alice,2)\n" + "(alice,3)\n" + "(bob,5)\n" + "(alice,3)\n" + "(tom,5)\n" + - "(john,2)\n" + "(john,1)\n" + "(grace,2)\n" + "(jerry,2)\n" + "(tom,4)\n" + "(bob,4)\n" + "(bob,2)\n" + - "(tom,2)\n" + "(alice,5)\n" + "(grace,5)\n" + "(grace,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(tom,1)\n" + - "(jerry,5)\n" + "(john,3)\n" + "(john,4)\n" + "(john,1)\n" + "(jerry,3)\n" + "(grace,3)\n" + "(bob,3)\n" + - "(john,3)\n" + "(jerry,4)\n" + "(tom,5)\n" + "(tom,4)\n" + "(john,2)\n" + "(jerry,1)\n" + "(bob,1)\n" + - "(john,5)\n" + "(grace,4)\n" + "(tom,5)\n" + "(john,4)\n" + "(tom,1)\n" + "(grace,1)\n" + "(john,2)\n" + - "(jerry,3)\n" + "(jerry,5)\n" + "(tom,2)\n" + "(tom,2)\n" + "(alice,4)\n" + "(tom,4)\n" + "(jerry,4)\n" + - "(john,3)\n" + "(grace,4)\n" + "(tom,3)\n" + "(jerry,4)\n" + "(john,5)\n" + "(john,4)\n" + "(jerry,1)\n" + - "(john,5)\n" + "(alice,2)\n" + "(tom,1)\n" + "(alice,5)\n" + "(grace,4)\n" + "(bob,4)\n" + "(jerry,1)\n" + - "(john,5)\n" + "(tom,4)\n" + "(tom,5)\n" + "(jerry,5)\n" + "(tom,1)\n" + "(grace,3)\n" + "(bob,5)\n" + - "(john,1)\n" + "(alice,1)\n" + "(grace,3)\n" + "(grace,1)\n" + "(jerry,1)\n" + "(jerry,4)\n" + - "(bob,4)\n" + "(alice,3)\n" + "(tom,5)\n" + "(alice,4)\n" + "(alice,4)\n" + "(grace,4)\n" + "(john,5)\n" + - "(john,5)\n" + "(grace,4)\n" + "(tom,4)\n" + "(john,4)\n" + "(john,5)\n" + "(alice,5)\n" + "(jerry,5)\n" + - "(john,3)\n" + "(tom,5)\n" + "(jerry,4)\n" + "(grace,4)\n" + "(john,3)\n" + "(bob,2)"; + public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" + + "(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" + + "(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" + + "(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" + + "(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" + + "(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" + + "(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" + + "(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" + + "(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" + + "(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" + + "(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" + + "(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" + + "(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" + + "(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" + + "(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)"; - public static final String SALARIES_INPUT = "(john,6469)\n" + "(jerry,6760)\n" + "(jerry,8069)\n" + - "(tom,3662)\n" + "(grace,8427)\n" + "(john,9425)\n" + "(bob,9018)\n" + "(john,352)\n" + "(tom,3770)\n" + - "(grace,7622)\n" + "(jerry,7441)\n" + "(alice,1468)\n" + "(bob,5472)\n" + "(grace,898)\n" + - "(tom,3849)\n" + "(grace,1865)\n" + "(alice,5582)\n" + "(john,9511)\n" + "(alice,1541)\n" + - "(john,2477)\n" + "(grace,3561)\n" + "(john,1670)\n" + "(grace,7290)\n" + "(grace,6565)\n" + - "(tom,6179)\n" + "(tom,1601)\n" + "(john,2940)\n" + "(bob,4685)\n" + "(bob,710)\n" + "(bob,5936)\n" + - "(jerry,1412)\n" + "(grace,6515)\n" + "(grace,3321)\n" + "(tom,8088)\n" + "(john,2876)\n" + - "(bob,9896)\n" + "(grace,7368)\n" + "(grace,9749)\n" + "(bob,2048)\n" + "(alice,4782)\n" + - "(alice,3375)\n" + "(tom,5841)\n" + "(bob,958)\n" + "(bob,5258)\n" + "(tom,3935)\n" + "(jerry,4394)\n" + - "(alice,102)\n" + "(alice,4931)\n" + "(alice,5240)\n" + "(jerry,7951)\n" + "(john,5675)\n" + - "(bob,609)\n" + "(alice,5997)\n" + "(jerry,9651)\n" + "(alice,1328)\n" + "(bob,1022)\n" + - "(grace,2578)\n" + "(jerry,9704)\n" + "(tom,4476)\n" + "(grace,3784)\n" + "(alice,6144)\n" + - "(bob,6213)\n" + "(alice,7525)\n" + "(jerry,2908)\n" + "(grace,8464)\n" + "(jerry,9920)\n" + - "(bob,3720)\n" + "(bob,7612)\n" + "(alice,7211)\n" + "(jerry,6484)\n" + "(alice,1711)\n" + - "(jerry,5994)\n" + "(grace,928)\n" + "(jerry,2492)\n" + "(grace,9080)\n" + "(tom,4330)\n" + - "(bob,8302)\n" + "(john,4981)\n" + "(tom,1781)\n" + "(grace,1379)\n" + "(jerry,3700)\n" + - "(jerry,3584)\n" + "(jerry,2038)\n" + "(jerry,3902)\n" + "(tom,1336)\n" + "(jerry,7500)\n" + - "(tom,3648)\n" + "(alice,2533)\n" + "(tom,8685)\n" + "(bob,3968)\n" + "(tom,3241)\n" + "(bob,7461)\n" + - "(jerry,2138)\n" + "(alice,7503)\n" + "(alice,6424)\n" + "(tom,140)\n" + "(john,9802)\n" + - "(grace,2977)\n" + "(grace,889)\n" + "(john,1338)"; + public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" + + "(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" + + "(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" + + "(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" + + "(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" + + "(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" + + "(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" + + "(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" + + "(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" + + "(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" + + "(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" + + "(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" + + "(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" + + "(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" + + "(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" + + "(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" + + "(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" + + "(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" + + "(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" + + "(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)"; private WindowJoinData() { } http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala index 239f1fa..225dab7 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala @@ -21,6 +21,8 @@ package org.apache.flink.streaming.scala.examples.join import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time import scala.Stream._ import scala.language.postfixOps @@ -32,8 +34,8 @@ object WindowJoin { // PROGRAM // ************************************************************************* - case class Grade(name: String, grade: Int) - case class Salary(name: String, salary: Int) + case class Grade(time: Long, name: String, grade: Int) + case class Salary(time: Long, name: String, salary: Int) case class Person(name: String, grade: Int, salary: Int) def main(args: Array[String]) { @@ -43,6 +45,7 @@ object WindowJoin { } val env = StreamExecutionEnvironment.getExecutionEnvironment + env.getConfig.enableTimestamps() //Create streams for grades and salaries by mapping the inputs to the corresponding objects val grades = setGradesInput(env) @@ -50,11 +53,11 @@ object WindowJoin { //Join the two input streams by name on the last 2 seconds every second and create new //Person objects containing both grade and salary - val joined = - grades.join(salaries).onWindow(2, TimeUnit.SECONDS) - .every(1, TimeUnit.SECONDS) - .where("name") - .equalTo("name") { (g, s) => Person(g.name, g.grade, s.salary) } + val joined = grades.join(salaries) + .where(_.name) + .equalTo(_.name) + .window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) + .apply { (g, s) => Person(g.name, g.grade, s.salary) } if (fileOutput) { joined.writeAsText(outputPath) @@ -74,27 +77,27 @@ object WindowJoin { val salaryMax = 10000 val sleepInterval = 100 - def gradeStream(): Stream[(String, Int)] = { - def gradeMapper(names: Array[String])(x: Int): (String, Int) = + def gradeStream: Stream[(Long, String, Int)] = { + def gradeMapper(names: Array[String])(x: Int): (Long, String, Int) = { if (x % sleepInterval == 0) Thread.sleep(sleepInterval) - (names(Random.nextInt(names.length)), Random.nextInt(gradeCount)) + (System.currentTimeMillis(),names(Random.nextInt(names.length)),Random.nextInt(gradeCount)) } range(1, 100).map(gradeMapper(names)) } - def salaryStream(): Stream[(String, Int)] = { - def salaryMapper(x: Int): (String, Int) = + def salaryStream: Stream[(Long, String, Int)] = { + def salaryMapper(x: Int): (Long, String, Int) = { if (x % sleepInterval == 0) Thread.sleep(sleepInterval) - (names(Random.nextInt(names.length)), Random.nextInt(salaryMax)) + (System.currentTimeMillis(), names(Random.nextInt(names.length)), Random.nextInt(salaryMax)) } range(1, 100).map(salaryMapper) } - def parseMap(line : String): (String, Int) = { + def parseMap(line : String): (Long, String, Int) = { val record = line.substring(1, line.length - 1).split(",") - (record(0), record(1).toInt) + (record(0).toLong, record(1), record(2).toInt) } // ************************************************************************* @@ -130,23 +133,23 @@ object WindowJoin { System.out.println(" Provide parameter to write to file.") System.out.println(" Usage: WindowJoin <result path>") } - return true + true } private def setGradesInput(env: StreamExecutionEnvironment) : DataStream[Grade] = { if (fileInput) { - env.readTextFile(gradesPath).map(parseMap(_)).map(x => Grade(x._1, x._2)) + env.readTextFile(gradesPath).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3)) } else { - env.fromCollection(gradeStream).map(x => Grade(x._1, x._2)) + env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3)) } } private def setSalariesInput(env: StreamExecutionEnvironment) : DataStream[Salary] = { if (fileInput) { - env.readTextFile(salariesPath).map(parseMap(_)).map(x => Salary(x._1, x._2)) + env.readTextFile(salariesPath).map(parseMap _).map(x => Salary(x._1, x._2, x._3)) } else { - env.fromCollection(salaryStream).map(x => Salary(x._1, x._2)) + env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3)) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java index aae4b93..e657b67 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java @@ -1,51 +1,50 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// TODO: reactivate once we have new join implementation -//package org.apache.flink.streaming.test.exampleJavaPrograms.join; -// -//import org.apache.flink.streaming.examples.join.WindowJoin; -//import org.apache.flink.streaming.examples.join.util.WindowJoinData; -//import org.apache.flink.streaming.util.StreamingProgramTestBase; -// -//public class WindowJoinITCase extends StreamingProgramTestBase { -// -// protected String gradesPath; -// protected String salariesPath; -// protected String resultPath; -// -// @Override -// protected void preSubmit() throws Exception { -// gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT); -// salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT); -// resultPath = getTempDirPath("result"); -// } -// -// @Override -// protected void postSubmit() throws Exception { -// // since the two sides of the join might have different speed -// // the exact output can not be checked just whether it is well-formed -// // checks that the result lines look like e.g. (bob, 2, 2015) -// checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)"); -// } -// -// @Override -// protected void testProgram() throws Exception { -// WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath}); -// } -//} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test.exampleJavaPrograms.join; + +import org.apache.flink.streaming.examples.join.WindowJoin; +import org.apache.flink.streaming.examples.join.util.WindowJoinData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class WindowJoinITCase extends StreamingProgramTestBase { + + protected String gradesPath; + protected String salariesPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT); + salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + // since the two sides of the join might have different speed + // the exact output can not be checked just whether it is well-formed + // checks that the result lines look like e.g. (bob, 2, 2015) + checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)"); + } + + @Override + protected void testProgram() throws Exception { + WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java index 0aa884f..08ce890 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java @@ -1,51 +1,50 @@ -///* -// * Licensed to the Apache Software Foundation (ASF) under one -// * or more contributor license agreements. See the NOTICE file -// * distributed with this work for additional information -// * regarding copyright ownership. The ASF licenses this file -// * to you under the Apache License, Version 2.0 (the -// * "License"); you may not use this file except in compliance -// * with the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// TODO: reactivate once we have new join implementation -//package org.apache.flink.streaming.test.exampleScalaPrograms.join; -// -//import org.apache.flink.streaming.scala.examples.join.WindowJoin; -//import org.apache.flink.streaming.examples.join.util.WindowJoinData; -//import org.apache.flink.streaming.util.StreamingProgramTestBase; -// -//public class WindowJoinITCase extends StreamingProgramTestBase { -// -// protected String gradesPath; -// protected String salariesPath; -// protected String resultPath; -// -// @Override -// protected void preSubmit() throws Exception { -// gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT); -// salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT); -// resultPath = getTempDirPath("result"); -// } -// -// @Override -// protected void postSubmit() throws Exception { -// // since the two sides of the join might have different speed -// // the exact output can not be checked just whether it is well-formed -// // checks that the result lines look like e.g. Person(bob, 2, 2015) -// checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)"); -// } -// -// @Override -// protected void testProgram() throws Exception { -// WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath}); -// } -//} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test.exampleScalaPrograms.join; + +import org.apache.flink.streaming.scala.examples.join.WindowJoin; +import org.apache.flink.streaming.examples.join.util.WindowJoinData; +import org.apache.flink.streaming.util.StreamingProgramTestBase; + +public class WindowJoinITCase extends StreamingProgramTestBase { + + protected String gradesPath; + protected String salariesPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT); + salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + // since the two sides of the join might have different speed + // the exact output can not be checked just whether it is well-formed + // checks that the result lines look like e.g. Person(bob, 2, 2015) + checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)"); + } + + @Override + protected void testProgram() throws Exception { + WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath}); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala new file mode 100644 index 0000000..1b16e44 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.common.functions.CoGroupFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams} +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner +import org.apache.flink.streaming.api.windowing.evictors.Evictor +import org.apache.flink.streaming.api.windowing.triggers.Trigger +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +import scala.collection.JavaConverters._ + +import scala.reflect.ClassTag + +/** + * `CoGroupedStreams` represents two [[DataStream]]s that have been co-grouped. + * A streaming co-group operation is evaluated over elements in a window. + * + * To finalize the co-group operation you also need to specify a [[KeySelector]] for + * both the first and second input and a [[WindowAssigner]] + * + * Note: Right now, the groups are being built in memory so you need to ensure that they don't + * get too big. Otherwise the JVM might crash. + * + * Example: + * + * {{{ + * val one: DataStream[(String, Int)] = ... + * val two: DataStream[(String, Int)] = ... + * + * val result = one.coGroup(two) + * .where(new MyFirstKeySelector()) + * .equalTo(new MyFirstKeySelector()) + * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) + * .apply(new MyCoGroupFunction()) + * } }}} + */ +object CoGroupedStreams { + + /** + * A co-group operation that does not yet have its [[KeySelector]]s defined. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + */ + class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { + + /** + * Specifies a [[KeySelector]] for elements from the first input. + */ + def where[KEY](keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val javaSelector = new KeySelector[T1, KEY] { + def getKey(in: T1) = cleanFun(in) + } + new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null) + } + + /** + * Specifies a [[KeySelector]] for elements from the second input. + */ + def equalTo[KEY](keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val javaSelector = new KeySelector[T2, KEY] { + def getKey(in: T2) = cleanFun(in) + } + new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + } + } + + /** + * A co-group operation that has [[KeySelector]]s defined for either both or + * one input. + * + * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]] + * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]]. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + * @tparam KEY Type of the key. This must be the same for both inputs + */ + class WithKey[T1, T2, KEY]( + input1: DataStream[T1], + input2: DataStream[T2], + keySelector1: KeySelector[T1, KEY], + keySelector2: KeySelector[T2, KEY]) { + + /** + * Specifies a [[KeySelector]] for elements from the first input. + */ + def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val javaSelector = new KeySelector[T1, KEY] { + def getKey(in: T1) = cleanFun(in) + } + new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2) + } + + /** + * Specifies a [[KeySelector]] for elements from the second input. + */ + def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val javaSelector = new KeySelector[T2, KEY] { + def getKey(in: T2) = cleanFun(in) + } + new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector) + } + + /** + * Specifies the window on which the co-group operation works. + */ + def window[W <: Window]( + assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W]) + : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = { + if (keySelector1 == null || keySelector2 == null) { + throw new UnsupportedOperationException("You first need to specify KeySelectors for both" + + "inputs using where() and equalTo().") + } + new CoGroupedStreams.WithWindow[T1, T2, KEY, W]( + input1, + input2, + keySelector1, + keySelector2, + clean(assigner), + null, + null) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + } + } + + /** + * A co-group operation that has [[KeySelector]]s defined for both inputs as + * well as a [[WindowAssigner]]. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + * @tparam KEY Type of the key. This must be the same for both inputs + * @tparam W Type of { @link Window} on which the co-group operation works. + */ + class WithWindow[T1, T2, KEY, W <: Window]( + input1: DataStream[T1], + input2: DataStream[T2], + keySelector1: KeySelector[T1, KEY], + keySelector2: KeySelector[T2, KEY], + windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W], + trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W], + evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) { + + + /** + * Sets the [[Trigger]] that should be used to trigger window emission. + */ + def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) + : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = { + new WithWindow[T1, T2, KEY, W]( + input1, + input2, + keySelector1, + keySelector2, + windowAssigner, + newTrigger, + evictor) + } + + /** + * Sets the [[Evictor]] that should be used to evict elements from a window before emission. + * + * Note: When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + */ + def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) + : CoGroupedStreams.WithWindow[T1, T2, KEY, W] = { + new WithWindow[T1, T2, KEY, W]( + input1, + input2, + keySelector1, + keySelector2, + windowAssigner, + trigger, + newEvictor) + } + + /** + * Completes the co-group operation with the user function that is executed + * for windowed groups. + */ + def apply[O: TypeInformation: ClassTag]( + fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = { + require(fun != null, "CoGroup function must not be null.") + + val coGrouper = new CoGroupFunction[T1, T2, O] { + val cleanFun = clean(fun) + def coGroup( + left: java.lang.Iterable[T1], + right: java.lang.Iterable[T2], out: Collector[O]) = { + out.collect(cleanFun(left.iterator().asScala, right.iterator().asScala)) + } + } + apply(coGrouper) + } + + /** + * Completes the co-group operation with the user function that is executed + * for windowed groups. + */ + def apply[O: TypeInformation: ClassTag]( + fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = { + require(fun != null, "CoGroup function must not be null.") + + val coGrouper = new CoGroupFunction[T1, T2, O] { + val cleanFun = clean(fun) + def coGroup( + left: java.lang.Iterable[T1], + right: java.lang.Iterable[T2], out: Collector[O]) = { + cleanFun(left.iterator.asScala, right.iterator.asScala, out) + } + } + apply(coGrouper) + } + + /** + * Completes the co-group operation with the user function that is executed + * for windowed groups. + */ + def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = { + + val coGroup = JavaCoGroupedStreams.createCoGroup(input1.getJavaStream, input2.getJavaStream) + + coGroup + .where(keySelector1) + .equalTo(keySelector2) + .window(windowAssigner) + .trigger(trigger) + .evictor(evictor) + .apply(clean(function), implicitly[TypeInformation[T]]) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + } + } + + + /** + * Creates a new co-group operation from the two given inputs. + */ + def createCoGroup[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) + : CoGroupedStreams.Unspecified[T1, T2] = { + new CoGroupedStreams.Unspecified[T1, T2](input1, input2) + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 8aeacb4..7babc40 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -751,18 +751,20 @@ class DataStream[T](javaStream: JavaStream[T]) { } /** - * Initiates a temporal Join transformation that joins the elements of two - * data streams on key equality over a specified time window. - * - * This method returns a StreamJoinOperator on which the - * .onWindow(..) should be called to define the - * window, and then the .where(..) and .equalTo(..) methods can be used to defin - * the join keys.</p> The user can also use the apply method of the returned JoinedStream - * to use custom join function. - * + * Creates a co-group operation. See [[CoGroupedStreams]] for an example of how the keys + * and window can be specified. */ - def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = - new StreamJoinOperator[T, R](javaStream, stream.getJavaStream) + def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams.Unspecified[T, T2] = { + CoGroupedStreams.createCoGroup(this, otherStream) + } + + /** + * Creates a join operation. See [[JoinedStreams]] for an example of how the keys + * and window can be specified. + */ + def join[T2](otherStream: DataStream[T2]): JoinedStreams.Unspecified[T, T2] = { + JoinedStreams.createJoin(this, otherStream) + } /** * Writes a DataStream to the standard output stream (stdout). For each http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala new file mode 100644 index 0000000..be059b8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams} +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner +import org.apache.flink.streaming.api.windowing.evictors.Evictor +import org.apache.flink.streaming.api.windowing.triggers.Trigger +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +import scala.reflect.ClassTag + +/** + * `JoinedStreams` represents two [[DataStream]]s that have been joined. + * A streaming join operation is evaluated over elements in a window. + * + * To finalize the join operation you also need to specify a [[KeySelector]] for + * both the first and second input and a [[WindowAssigner]] + * + * Note: Right now, the groups are being built in memory so you need to ensure that they don't + * get too big. Otherwise the JVM might crash. + * + * Example: + * + * {{{ + * val one: DataStream[(String, Int)] = ... + * val two: DataStream[(String, Int)] = ... + * + * val result = one.join(two) + * .where {t => ... } + * .equal {t => ... } + * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) + * .apply(new MyJoinFunction()) + * } }}} + */ +object JoinedStreams { + + /** + * A join operation that does not yet have its [[KeySelector]]s defined. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + */ + class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { + + /** + * Specifies a [[KeySelector]] for elements from the first input. + */ + def where[KEY](keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val javaSelector = new KeySelector[T1, KEY] { + def getKey(in: T1) = cleanFun(in) + } + new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null) + } + + /** + * Specifies a [[KeySelector]] for elements from the second input. + */ + def equalTo[KEY](keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val javaSelector = new KeySelector[T2, KEY] { + def getKey(in: T2) = cleanFun(in) + } + new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + } + } + + /** + * A join operation that has [[KeySelector]]s defined for either both or + * one input. + * + * You need to specify a [[KeySelector]] for both inputs using [[where()]] and [[equalTo()]] + * before you can proceeed with specifying a [[WindowAssigner]] using [[window()]]. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + * @tparam KEY Type of the key. This must be the same for both inputs + */ + class WithKey[T1, T2, KEY]( + input1: DataStream[T1], + input2: DataStream[T2], + keySelector1: KeySelector[T1, KEY], + keySelector2: KeySelector[T2, KEY]) { + + /** + * Specifies a [[KeySelector]] for elements from the first input. + */ + def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val javaSelector = new KeySelector[T1, KEY] { + def getKey(in: T1) = cleanFun(in) + } + new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2) + } + + /** + * Specifies a [[KeySelector]] for elements from the second input. + */ + def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = { + val cleanFun = clean(keySelector) + val javaSelector = new KeySelector[T2, KEY] { + def getKey(in: T2) = cleanFun(in) + } + new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector) + } + + /** + * Specifies the window on which the join operation works. + */ + def window[W <: Window]( + assigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W]) + : JoinedStreams.WithWindow[T1, T2, KEY, W] = { + if (keySelector1 == null || keySelector2 == null) { + throw new UnsupportedOperationException("You first need to specify KeySelectors for both" + + "inputs using where() and equalTo().") + } + new JoinedStreams.WithWindow[T1, T2, KEY, W]( + input1, + input2, + keySelector1, + keySelector2, + clean(assigner), + null, + null) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + } + } + + /** + * A join operation that has [[KeySelector]]s defined for both inputs as + * well as a [[WindowAssigner]]. + * + * @tparam T1 Type of the elements from the first input + * @tparam T2 Type of the elements from the second input + * @tparam KEY Type of the key. This must be the same for both inputs + * @tparam W Type of { @link Window} on which the join operation works. + */ + class WithWindow[T1, T2, KEY, W <: Window]( + input1: DataStream[T1], + input2: DataStream[T2], + keySelector1: KeySelector[T1, KEY], + keySelector2: KeySelector[T2, KEY], + windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W], + trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W], + evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) { + + + /** + * Sets the [[Trigger]] that should be used to trigger window emission. + */ + def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) + : JoinedStreams.WithWindow[T1, T2, KEY, W] = { + new WithWindow[T1, T2, KEY, W]( + input1, + input2, + keySelector1, + keySelector2, + windowAssigner, + newTrigger, + evictor) + } + + /** + * Sets the [[Evictor]] that should be used to evict elements from a window before emission. + * + * Note: When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + */ + def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) + : JoinedStreams.WithWindow[T1, T2, KEY, W] = { + new WithWindow[T1, T2, KEY, W]( + input1, + input2, + keySelector1, + keySelector2, + windowAssigner, + trigger, + newEvictor) + } + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + */ + def apply[O: TypeInformation: ClassTag](fun: (T1, T2) => O): DataStream[O] = { + require(fun != null, "Join function must not be null.") + + val joiner = new FlatJoinFunction[T1, T2, O] { + val cleanFun = clean(fun) + def join(left: T1, right: T2, out: Collector[O]) = { + out.collect(cleanFun(left, right)) + } + } + apply(joiner) + } + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + */ + + def apply[O: TypeInformation: ClassTag](fun: (T1, T2, Collector[O]) => Unit): DataStream[O] = { + require(fun != null, "Join function must not be null.") + + val joiner = new FlatJoinFunction[T1, T2, O] { + val cleanFun = clean(fun) + def join(left: T1, right: T2, out: Collector[O]) = { + cleanFun(left, right, out) + } + } + apply(joiner) + } + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + */ + def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = { + + val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream) + + join + .where(keySelector1) + .equalTo(keySelector2) + .window(windowAssigner) + .trigger(trigger) + .evictor(evictor) + .apply(clean(function), implicitly[TypeInformation[T]]) + } + + /** + * Completes the join operation with the user function that is executed + * for windowed groups. + */ + def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = { + + val join = JavaJoinedStreams.createJoin(input1.getJavaStream, input2.getJavaStream) + + join + .where(keySelector1) + .equalTo(keySelector2) + .window(windowAssigner) + .trigger(trigger) + .evictor(evictor) + .apply(clean(function), implicitly[TypeInformation[T]]) + } + + /** + * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]. + */ + private[flink] def clean[F <: AnyRef](f: F): F = { + new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f) + } + } + + + /** + * Creates a new join operation from the two given inputs. + */ + def createJoin[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) + : JoinedStreams.Unspecified[T1, T2] = { + new JoinedStreams.Unspecified[T1, T2](input1, input2) + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala deleted file mode 100644 index e0bbaf8..0000000 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import java.util.concurrent.TimeUnit - -import org.apache.flink.api.common.functions.JoinFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.java.operators.Keys -import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} -import org.apache.flink.streaming.util.keys.KeySelectorUtil - -import scala.Array.canBuildFrom -import scala.reflect.ClassTag - -class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends -TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { - - override def createNextWindowOperator() = { - new StreamJoinOperator.JoinWindow[I1, I2](this) - } -} - -object StreamJoinOperator { - - class JoinWindow[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2]) extends - TemporalWindow[JoinWindow[I1, I2]] { - - private[flink] val type1 = op.input1.getType() - - /** - * Continues a temporal Join transformation by defining - * the fields in the first stream to be used as keys for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where(fields: Int*) = { - new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray, type1), - type1, - op.input1.getExecutionEnvironment.getConfig)) - } - - /** - * Continues a temporal Join transformation by defining - * the fields in the first stream to be used as keys for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where(firstField: String, otherFields: String*) = - new JoinPredicate[I1, I2](op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray, type1), - type1, - op.input1.getExecutionEnvironment.getConfig)) - - /** - * Continues a temporal Join transformation by defining - * the keyselector function that will be used to extract keys from the first stream - * for the join. - * The resulting incomplete join can be completed by JoinPredicate.equalTo() - * to define the second key. - */ - def where[K: TypeInformation](fun: (I1) => K) = { - val keyType = implicitly[TypeInformation[K]] - val cleanFun = op.input1.clean(fun) - val keyExtractor = new KeySelector[I1, K] { - def getKey(in: I1) = cleanFun(in) - } - new JoinPredicate[I1, I2](op, keyExtractor) - } - - override def every(length: Long, timeUnit: TimeUnit): JoinWindow[I1, I2] = { - every(timeUnit.toMillis(length)) - } - - override def every(length: Long): JoinWindow[I1, I2] = { - op.slideInterval = length - this - } - - } - - class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], - private[flink] val keys1: KeySelector[I1, _]) { - private[flink] var keys2: KeySelector[I2, _] = null - private[flink] val type2 = op.input2.getType() - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo(fields: Int*): JoinedStream[I1, I2] = { - finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(fields.toArray, type2), - type2, - op.input1.getExecutionEnvironment.getConfig)) - } - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = - finish(KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys(firstField +: otherFields.toArray, type2), - type2, - op.input1.getExecutionEnvironment.getConfig)) - - /** - * Creates a temporal join transformation by defining the second join key. - * The returned transformation wrapes each joined element pair in a tuple2: - * (first, second) - * To define a custom wrapping, use JoinedStream.apply(...) - */ - def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = { - val keyType = implicitly[TypeInformation[K]] - val cleanFun = op.input1.clean(fun) - val keyExtractor = new KeySelector[I2, K] { - def getKey(in: I2) = cleanFun(in) - } - finish(keyExtractor) - } - - private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = { - this.keys2 = keys2 - new JoinedStream[I1, I2](this, createJoinOperator()) - } - - private def createJoinOperator(): JavaStream[(I1, I2)] = { - -// val returnType = createTuple2TypeInformation[I1, I2](op.input1.getType, op.input2.getType) -// op.input1.keyBy(keys1).connect(op.input2.keyBy(keys2)) -// .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), -// returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) - null - } - } - - class JoinedStream[I1, I2]( - jp: JoinPredicate[I1, I2], - javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) { - - private val op = jp.op - - /** - * Sets a wrapper for the joined elements. For each joined pair, the result of the - * udf call will be emitted. - */ - def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { - - val cleanFun = clean(getJoinWindowFunction(jp, fun)) - -// op.input1.keyBy(jp.keys1).connect(op.input2.keyBy(jp.keys2)) -// .addGeneralWindowCombine[R]( -// cleanFun, -// implicitly[TypeInformation[R]], -// op.windowSize, -// op.slideInterval, -// op.timeStamp1, -// op.timeStamp2) - null - } - } - - private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], - joinFunction: (I1, I2) => R) = { - require(joinFunction != null, "Join function must not be null.") - - val cleanFun = jp.op.input1.clean(joinFunction) - - val joinFun = new JoinFunction[I1, I2, R] { - override def join(first: I1, second: I2): R = { - cleanFun(first, second) - } - } - -// new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun) - null - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala deleted file mode 100644 index 8357c4d..0000000 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/TemporalOperator.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.scala - -import org.apache.flink.api.scala.ClosureCleaner -import org.apache.flink.streaming.api.datastream.temporal.{ TemporalOperator => JTempOp } -import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } -import org.apache.flink.streaming.api.datastream.temporal.TemporalWindow -import org.apache.flink.streaming.api.windowing.helper.Timestamp - -abstract class TemporalOperator[I1, I2, OP <: TemporalWindow[OP]]( - i1: JavaStream[I1], i2: JavaStream[I2]) extends JTempOp[I1, I2, OP](i1, i2) { - - def onWindow(length: Long, ts1: I1 => Long, ts2: I2 => Long, startTime: Long = 0): OP = { - val timeStamp1 = getTS(ts1) - val timeStamp2 = getTS(ts2) - onWindow(length, timeStamp1, timeStamp2, startTime) - } - - def getTS[R](ts: R => Long): Timestamp[R] = { - val cleanFun = clean(ts) - new Timestamp[R] { - def getTimestamp(in: R) = cleanFun(in) - } - } - - /** - * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning - * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig} - */ - private[flink] def clean[F <: AnyRef](f: F): F = { - new StreamExecutionEnvironment(i1.getExecutionEnvironment).scalaClean(f) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala new file mode 100644 index 0000000..7232309 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.functions.TimestampExtractor +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Test +import org.junit.Assert._ + +import scala.collection.mutable + +class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { + + @Test + def testCoGroup(): Unit = { + CoGroupJoinITCase.testResults = mutable.MutableList() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + env.getConfig.enableTimestamps + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + } + + def cancel() { + } + }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor) + + val source2 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("b", 3)) + ctx.collect(("c", 6)) + ctx.collect(("c", 7)) + ctx.collect(("c", 8)) + } + + def cancel() { + } + }).extractTimestamp(new CoGroupJoinITCase.Tuple2TimestampExtractor) + + source1.coGroup(source2) + .where(_._1) + .equalTo(_._1) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply { (first: Iterator[(String, Int)], second: Iterator[(String, Int)]) => + "F:" + first.mkString("") + " S:" + second.mkString("") + } + .addSink(new SinkFunction[String]() { + def invoke(value: String) { + CoGroupJoinITCase.testResults += value + } + }) + + env.execute("CoGroup Test") + + val expectedResult = mutable.MutableList( + "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)", + "F:(b,3)(b,4)(b,5) S:(b,3)", + "F:(a,6)(a,7)(a,8) S:", + "F: S:(c,6)(c,7)(c,8)") + + assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { + CoGroupJoinITCase.testResults = mutable.MutableList() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + env.getConfig.enableTimestamps + + val source1 = env.addSource(new SourceFunction[(String, String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) { + ctx.collect(("a", "x", 0)) + ctx.collect(("a", "y", 1)) + ctx.collect(("a", "z", 2)) + + ctx.collect(("b", "u", 3)) + ctx.collect(("b", "w", 5)) + + ctx.collect(("a", "i", 6)) + ctx.collect(("a", "j", 7)) + ctx.collect(("a", "k", 8)) + } + + def cancel() { + } + }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor) + + val source2 = env.addSource(new SourceFunction[(String, String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) { + ctx.collect(("a", "u", 0)) + ctx.collect(("a", "w", 1)) + + ctx.collect(("b", "i", 3)) + ctx.collect(("b", "k", 5)) + + ctx.collect(("a", "x", 6)) + ctx.collect(("a", "z", 8)) + } + + def cancel() { + } + }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor) + + source1.join(source2) + .where(_._1) + .equalTo(_._1) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply( (l, r) => l.toString + ":" + r.toString) + .addSink(new SinkFunction[String]() { + def invoke(value: String) { + CoGroupJoinITCase.testResults += value + } + }) + + env.execute("Join Test") + + val expectedResult = mutable.MutableList( + "(a,x,0):(a,u,0)", + "(a,x,0):(a,w,1)", + "(a,y,1):(a,u,0)", + "(a,y,1):(a,w,1)", + "(a,z,2):(a,u,0)", + "(a,z,2):(a,w,1)", + "(b,u,3):(b,i,3)", + "(b,u,3):(b,k,5)", + "(b,w,5):(b,i,3)", + "(b,w,5):(b,k,5)", + "(a,i,6):(a,x,6)", + "(a,i,6):(a,z,8)", + "(a,j,7):(a,x,6)", + "(a,j,7):(a,z,8)", + "(a,k,8):(a,x,6)", + "(a,k,8):(a,z,8)") + + assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted) + } + + @Test + def testSelfJoin(): Unit = { + CoGroupJoinITCase.testResults = mutable.MutableList() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + env.getConfig.enableTimestamps + + val source1 = env.addSource(new SourceFunction[(String, String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) { + ctx.collect(("a", "x", 0)) + ctx.collect(("a", "y", 1)) + ctx.collect(("a", "z", 2)) + + ctx.collect(("b", "u", 3)) + ctx.collect(("b", "w", 5)) + + ctx.collect(("a", "i", 6)) + ctx.collect(("a", "j", 7)) + ctx.collect(("a", "k", 8)) + } + + def cancel() { + } + }).extractTimestamp(new CoGroupJoinITCase.Tuple3TimestampExtractor) + + source1.join(source1) + .where(_._1) + .equalTo(_._1) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply( (l, r) => l.toString + ":" + r.toString) + .addSink(new SinkFunction[String]() { + def invoke(value: String) { + CoGroupJoinITCase.testResults += value + } + }) + + env.execute("Self-Join Test") + + val expectedResult = mutable.MutableList( + "(a,x,0):(a,x,0)", + "(a,x,0):(a,y,1)", + "(a,x,0):(a,z,2)", + "(a,y,1):(a,x,0)", + "(a,y,1):(a,y,1)", + "(a,y,1):(a,z,2)", + "(a,z,2):(a,x,0)", + "(a,z,2):(a,y,1)", + "(a,z,2):(a,z,2)", + "(b,u,3):(b,u,3)", + "(b,u,3):(b,w,5)", + "(b,w,5):(b,u,3)", + "(b,w,5):(b,w,5)", + "(a,i,6):(a,i,6)", + "(a,i,6):(a,j,7)", + "(a,i,6):(a,k,8)", + "(a,j,7):(a,i,6)", + "(a,j,7):(a,j,7)", + "(a,j,7):(a,k,8)", + "(a,k,8):(a,i,6)", + "(a,k,8):(a,j,7)", + "(a,k,8):(a,k,8)") + + assertEquals(expectedResult.sorted, CoGroupJoinITCase.testResults.sorted) + } + +} + + +object CoGroupJoinITCase { + private var testResults: mutable.MutableList[String] = null + + private class Tuple2TimestampExtractor extends TimestampExtractor[(String, Int)] { + def extractTimestamp(element: (String, Int), currentTimestamp: Long): Long = { + element._2 + } + + def emitWatermark(element: (String, Int), currentTimestamp: Long): Long = { + element._2 - 1 + } + + def getCurrentWatermark: Long = { + Long.MinValue + } + } + + private class Tuple3TimestampExtractor extends TimestampExtractor[(String, String, Int)] { + def extractTimestamp(element: (String, String, Int), currentTimestamp: Long): Long = { + element._3 + } + + def emitWatermark(element: (String, String, Int), currentTimestamp: Long): Long = { + element._3 - 1 + } + + def getCurrentWatermark: Long = { + Long.MinValue + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala index d1fd233..c6bd87a 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala import java.lang.reflect.Method import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase -import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream} +import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, JoinedStreams} import scala.language.existentials @@ -138,14 +138,14 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { classOf[KeyedStream[_, _]]) checkMethods( - "StreamJoinOperator", "StreamJoinOperator", - classOf[org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator[_,_]], - classOf[StreamJoinOperator[_,_]]) + "JoinedStreams.WithWindow", "JoinedStreams.WithWindow", + classOf[org.apache.flink.streaming.api.datastream.JoinedStreams.WithWindow[_,_,_,_]], + classOf[JoinedStreams.WithWindow[_,_,_,_]]) checkMethods( - "TemporalOperator", "TemporalOperator", - classOf[org.apache.flink.streaming.api.datastream.temporal.TemporalOperator[_,_,_]], - classOf[TemporalOperator[_,_,_]]) + "CoGroupedStreams.WithWindow", "CoGroupedStreams.WithWindow", + classOf[org.apache.flink.streaming.api.datastream.CoGroupedStreams.WithWindow[_,_,_,_]], + classOf[CoGroupedStreams.WithWindow[_,_,_,_]]) checkMethods( "WindowedDataStream", "WindowedDataStream",
