Move CoGroupJoinITCase to windowing package
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce792b11 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce792b11 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce792b11 Branch: refs/heads/master Commit: ce792b11a4a2b8d7b02a59dcadcc4c052ff5531a Parents: ff367d6 Author: Aljoscha Krettek <[email protected]> Authored: Wed Oct 7 16:18:24 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 7 22:08:25 2015 +0200 ---------------------------------------------------------------------- .../flink/streaming/api/CoGroupJoinITCase.java | 372 ------------------ .../operators/windowing/CoGroupJoinITCase.java | 373 +++++++++++++++++++ 2 files changed, 373 insertions(+), 372 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ce792b11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java deleted file mode 100644 index 9ddd6eb..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java +++ /dev/null @@ -1,372 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.flink.streaming.api; - -import com.google.common.collect.Lists; -import org.apache.flink.api.common.functions.CoGroupFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.TimestampExtractor; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.util.Collector; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { - - private static List<String> testResults; - - @Test - public void testCoGroup() throws Exception { - - testResults = Lists.newArrayList(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(1); - - DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { - ctx.collect(Tuple2.of("a", 0)); - ctx.collect(Tuple2.of("a", 1)); - ctx.collect(Tuple2.of("a", 2)); - - ctx.collect(Tuple2.of("b", 3)); - ctx.collect(Tuple2.of("b", 4)); - ctx.collect(Tuple2.of("b", 5)); - - ctx.collect(Tuple2.of("a", 6)); - ctx.collect(Tuple2.of("a", 7)); - ctx.collect(Tuple2.of("a", 8)); - } - - @Override - public void cancel() { - } - }).extractTimestamp(new Tuple2TimestampExtractor()); - - DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { - ctx.collect(Tuple2.of("a", 0)); - ctx.collect(Tuple2.of("a", 1)); - - ctx.collect(Tuple2.of("b", 3)); - - ctx.collect(Tuple2.of("c", 6)); - ctx.collect(Tuple2.of("c", 7)); - ctx.collect(Tuple2.of("c", 8)); - } - - @Override - public void cancel() { - } - }).extractTimestamp(new Tuple2TimestampExtractor()); - - - source1.coGroup(source2) - .where(new Tuple2KeyExtractor()) - .equalTo(new Tuple2KeyExtractor()) - .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) - .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() { - @Override - public void coGroup(Iterable<Tuple2<String, Integer>> first, - Iterable<Tuple2<String, Integer>> second, - Collector<String> out) throws Exception { - StringBuilder result = new StringBuilder(); - result.append("F:"); - for (Tuple2<String, Integer> t: first) { - result.append(t.toString()); - } - result.append(" S:"); - for (Tuple2<String, Integer> t: second) { - result.append(t.toString()); - } - out.collect(result.toString()); - } - }) - .addSink(new SinkFunction<String>() { - @Override - public void invoke(String value) throws Exception { - testResults.add(value); - } - }); - - env.execute("CoGroup Test"); - - List<String> expectedResult = Lists.newArrayList( - "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)", - "F:(b,3)(b,4)(b,5) S:(b,3)", - "F:(a,6)(a,7)(a,8) S:", - "F: S:(c,6)(c,7)(c,8)"); - - Collections.sort(expectedResult); - Collections.sort(testResults); - - Assert.assertEquals(expectedResult, testResults); - } - - @Test - public void testJoin() throws Exception { - - testResults = Lists.newArrayList(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(1); - - DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { - ctx.collect(Tuple3.of("a", "x", 0)); - ctx.collect(Tuple3.of("a", "y", 1)); - ctx.collect(Tuple3.of("a", "z", 2)); - - ctx.collect(Tuple3.of("b", "u", 3)); - ctx.collect(Tuple3.of("b", "w", 5)); - - ctx.collect(Tuple3.of("a", "i", 6)); - ctx.collect(Tuple3.of("a", "j", 7)); - ctx.collect(Tuple3.of("a", "k", 8)); - } - - @Override - public void cancel() { - } - }).extractTimestamp(new Tuple3TimestampExtractor()); - - DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { - ctx.collect(Tuple3.of("a", "u", 0)); - ctx.collect(Tuple3.of("a", "w", 1)); - - ctx.collect(Tuple3.of("b", "i", 3)); - ctx.collect(Tuple3.of("b", "k", 5)); - - ctx.collect(Tuple3.of("a", "x", 6)); - ctx.collect(Tuple3.of("a", "z", 8)); - } - - @Override - public void cancel() { - } - }).extractTimestamp(new Tuple3TimestampExtractor()); - - - source1.join(source2) - .where(new Tuple3KeyExtractor()) - .equalTo(new Tuple3KeyExtractor()) - .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) - .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() { - @Override - public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception { - return first + ":" + second; - } - }) - .addSink(new SinkFunction<String>() { - @Override - public void invoke(String value) throws Exception { - testResults.add(value); - } - }); - - env.execute("Join Test"); - - List<String> expectedResult = Lists.newArrayList( - "(a,x,0):(a,u,0)", - "(a,x,0):(a,w,1)", - "(a,y,1):(a,u,0)", - "(a,y,1):(a,w,1)", - "(a,z,2):(a,u,0)", - "(a,z,2):(a,w,1)", - "(b,u,3):(b,i,3)", - "(b,u,3):(b,k,5)", - "(b,w,5):(b,i,3)", - "(b,w,5):(b,k,5)", - "(a,i,6):(a,x,6)", - "(a,i,6):(a,z,8)", - "(a,j,7):(a,x,6)", - "(a,j,7):(a,z,8)", - "(a,k,8):(a,x,6)", - "(a,k,8):(a,z,8)"); - - Collections.sort(expectedResult); - Collections.sort(testResults); - - Assert.assertEquals(expectedResult, testResults); - } - - @Test - public void testSelfJoin() throws Exception { - - testResults = Lists.newArrayList(); - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(1); - - DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { - ctx.collect(Tuple3.of("a", "x", 0)); - ctx.collect(Tuple3.of("a", "y", 1)); - ctx.collect(Tuple3.of("a", "z", 2)); - - ctx.collect(Tuple3.of("b", "u", 3)); - ctx.collect(Tuple3.of("b", "w", 5)); - - ctx.collect(Tuple3.of("a", "i", 6)); - ctx.collect(Tuple3.of("a", "j", 7)); - ctx.collect(Tuple3.of("a", "k", 8)); - } - - @Override - public void cancel() { - } - }).extractTimestamp(new Tuple3TimestampExtractor()); - - source1.join(source1) - .where(new Tuple3KeyExtractor()) - .equalTo(new Tuple3KeyExtractor()) - .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) - .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() { - @Override - public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception { - return first + ":" + second; - } - }) - .addSink(new SinkFunction<String>() { - @Override - public void invoke(String value) throws Exception { - testResults.add(value); - } - }); - - env.execute("Self-Join Test"); - - List<String> expectedResult = Lists.newArrayList( - "(a,x,0):(a,x,0)", - "(a,x,0):(a,y,1)", - "(a,x,0):(a,z,2)", - "(a,y,1):(a,x,0)", - "(a,y,1):(a,y,1)", - "(a,y,1):(a,z,2)", - "(a,z,2):(a,x,0)", - "(a,z,2):(a,y,1)", - "(a,z,2):(a,z,2)", - "(b,u,3):(b,u,3)", - "(b,u,3):(b,w,5)", - "(b,w,5):(b,u,3)", - "(b,w,5):(b,w,5)", - "(a,i,6):(a,i,6)", - "(a,i,6):(a,j,7)", - "(a,i,6):(a,k,8)", - "(a,j,7):(a,i,6)", - "(a,j,7):(a,j,7)", - "(a,j,7):(a,k,8)", - "(a,k,8):(a,i,6)", - "(a,k,8):(a,j,7)", - "(a,k,8):(a,k,8)"); - - Collections.sort(expectedResult); - Collections.sort(testResults); - - Assert.assertEquals(expectedResult, testResults); - } - - private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) { - return element.f1; - } - - @Override - public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) { - return element.f1 - 1; - } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } - } - - private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) { - return element.f2; - } - - @Override - public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) { - return element.f2 - 1; - } - - @Override - public long getCurrentWatermark() { - return Long.MIN_VALUE; - } - } - - private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> { - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple2<String, Integer> value) throws Exception { - return value.f0; - } - } - - private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> { - private static final long serialVersionUID = 1L; - - @Override - public String getKey(Tuple3<String, String, Integer> value) throws Exception { - return value.f0; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/ce792b11/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java new file mode 100644 index 0000000..bb79e5e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java @@ -0,0 +1,373 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { + + private static List<String> testResults; + + @Test + public void testCoGroup() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + ctx.collect(Tuple2.of("a", 2)); + + ctx.collect(Tuple2.of("b", 3)); + ctx.collect(Tuple2.of("b", 4)); + ctx.collect(Tuple2.of("b", 5)); + + ctx.collect(Tuple2.of("a", 6)); + ctx.collect(Tuple2.of("a", 7)); + ctx.collect(Tuple2.of("a", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple2TimestampExtractor()); + + DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + + ctx.collect(Tuple2.of("b", 3)); + + ctx.collect(Tuple2.of("c", 6)); + ctx.collect(Tuple2.of("c", 7)); + ctx.collect(Tuple2.of("c", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple2TimestampExtractor()); + + + source1.coGroup(source2) + .where(new Tuple2KeyExtractor()) + .equalTo(new Tuple2KeyExtractor()) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() { + @Override + public void coGroup(Iterable<Tuple2<String, Integer>> first, + Iterable<Tuple2<String, Integer>> second, + Collector<String> out) throws Exception { + StringBuilder result = new StringBuilder(); + result.append("F:"); + for (Tuple2<String, Integer> t: first) { + result.append(t.toString()); + } + result.append(" S:"); + for (Tuple2<String, Integer> t: second) { + result.append(t.toString()); + } + out.collect(result.toString()); + } + }) + .addSink(new SinkFunction<String>() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("CoGroup Test"); + + List<String> expectedResult = Lists.newArrayList( + "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)", + "F:(b,3)(b,4)(b,5) S:(b,3)", + "F:(a,6)(a,7)(a,8) S:", + "F: S:(c,6)(c,7)(c,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + @Test + public void testJoin() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "x", 0)); + ctx.collect(Tuple3.of("a", "y", 1)); + ctx.collect(Tuple3.of("a", "z", 2)); + + ctx.collect(Tuple3.of("b", "u", 3)); + ctx.collect(Tuple3.of("b", "w", 5)); + + ctx.collect(Tuple3.of("a", "i", 6)); + ctx.collect(Tuple3.of("a", "j", 7)); + ctx.collect(Tuple3.of("a", "k", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple3TimestampExtractor()); + + DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "u", 0)); + ctx.collect(Tuple3.of("a", "w", 1)); + + ctx.collect(Tuple3.of("b", "i", 3)); + ctx.collect(Tuple3.of("b", "k", 5)); + + ctx.collect(Tuple3.of("a", "x", 6)); + ctx.collect(Tuple3.of("a", "z", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple3TimestampExtractor()); + + + source1.join(source2) + .where(new Tuple3KeyExtractor()) + .equalTo(new Tuple3KeyExtractor()) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() { + @Override + public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception { + return first + ":" + second; + } + }) + .addSink(new SinkFunction<String>() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("Join Test"); + + List<String> expectedResult = Lists.newArrayList( + "(a,x,0):(a,u,0)", + "(a,x,0):(a,w,1)", + "(a,y,1):(a,u,0)", + "(a,y,1):(a,w,1)", + "(a,z,2):(a,u,0)", + "(a,z,2):(a,w,1)", + "(b,u,3):(b,i,3)", + "(b,u,3):(b,k,5)", + "(b,w,5):(b,i,3)", + "(b,w,5):(b,k,5)", + "(a,i,6):(a,x,6)", + "(a,i,6):(a,z,8)", + "(a,j,7):(a,x,6)", + "(a,j,7):(a,z,8)", + "(a,k,8):(a,x,6)", + "(a,k,8):(a,z,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + @Test + public void testSelfJoin() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "x", 0)); + ctx.collect(Tuple3.of("a", "y", 1)); + ctx.collect(Tuple3.of("a", "z", 2)); + + ctx.collect(Tuple3.of("b", "u", 3)); + ctx.collect(Tuple3.of("b", "w", 5)); + + ctx.collect(Tuple3.of("a", "i", 6)); + ctx.collect(Tuple3.of("a", "j", 7)); + ctx.collect(Tuple3.of("a", "k", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple3TimestampExtractor()); + + source1.join(source1) + .where(new Tuple3KeyExtractor()) + .equalTo(new Tuple3KeyExtractor()) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() { + @Override + public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception { + return first + ":" + second; + } + }) + .addSink(new SinkFunction<String>() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("Self-Join Test"); + + List<String> expectedResult = Lists.newArrayList( + "(a,x,0):(a,x,0)", + "(a,x,0):(a,y,1)", + "(a,x,0):(a,z,2)", + "(a,y,1):(a,x,0)", + "(a,y,1):(a,y,1)", + "(a,y,1):(a,z,2)", + "(a,z,2):(a,x,0)", + "(a,z,2):(a,y,1)", + "(a,z,2):(a,z,2)", + "(b,u,3):(b,u,3)", + "(b,u,3):(b,w,5)", + "(b,w,5):(b,u,3)", + "(b,w,5):(b,w,5)", + "(a,i,6):(a,i,6)", + "(a,i,6):(a,j,7)", + "(a,i,6):(a,k,8)", + "(a,j,7):(a,i,6)", + "(a,j,7):(a,j,7)", + "(a,j,7):(a,k,8)", + "(a,k,8):(a,i,6)", + "(a,k,8):(a,j,7)", + "(a,k,8):(a,k,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) { + return element.f1; + } + + @Override + public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) { + return element.f1 - 1; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + } + + private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) { + return element.f2; + } + + @Override + public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) { + return element.f2 - 1; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + } + + private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple2<String, Integer> value) throws Exception { + return value.f0; + } + } + + private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple3<String, String, Integer> value) throws Exception { + return value.f0; + } + } + +}
