[
https://issues.apache.org/jira/browse/FLINK-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376437#comment-14376437
]
ASF GitHub Bot commented on FLINK-1595:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/520#discussion_r26969789
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
---
@@ -0,0 +1,903 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.complex;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.WindowMapFunction;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import
org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Delta;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.RectangleClass;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ComplexIntegrationTest implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final long MEMORYSIZE = 32;
+
+ private static Map<String, List<String>> results = new HashMap<String,
List<String>>();
+
+ @SuppressWarnings("unchecked")
+ public static List<Tuple5<Integer, String, Character, Double, Boolean>>
input = Arrays.asList(
+ new Tuple5<Integer, String, Character, Double,
Boolean>(1, "apple", 'j', 0.1, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(1, "peach", 'b', 0.8, true),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(1, "orange", 'c', 0.7, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(2, "apple", 'd', 0.5, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(2, "apple", 'e', 0.6, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(3, "peach", 'a', 0.2, true),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(6, "peanut", 'b', 0.1, true),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(7, "banana", 'c', 0.4, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(8, "peanut", 'd', 0.2, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(10, "cherry", 'e', 0.1, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(10, "plum", 'a', 0.5, true),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(11, "strawberry", 'b', 0.3, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(11, "orange", 'c', 0.3, true),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(12, "lemon", 'd', 0.9, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(13, "apple", 'e', 0.7, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(15, "lemon", 'a', 0.2, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(16, "apple", 'b', 0.8, true),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(16, "banana", 'c', 0.8, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(16, "orange", 'd', 0.1, false),
+ new Tuple5<Integer, String, Character, Double,
Boolean>(17, "cherry", 'e', 1.0, false));
+
+// @Test
+// public void complexIntegrationTest() throws Exception {
+//
+// MyTimestamp timestamp = new MyTimestamp();
+// List<String> resultList1 = new ArrayList<String>();
+// //results.put("resultList")
+//
+// StreamExecutionEnvironment env = new TestStreamEnvironment(1,
MEMORYSIZE);
+// env.setBufferTimeout(10);
+//
+// DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 =
env.addSource(new TupleSource());
+// DataStream<OuterPojo> sourceStream2 = env.addSource(new
PojoSource());
+// DataStream<Long> sourceStream3 = env.generateSequence(1, 50);
+// DataStream<Tuple5<Integer, String, Character, Double, Boolean>>
sourceStream4 = env.fromCollection(input);
+// DataStream<Long> sourceStream5 = env.fromElements(10L, 20L,
30L, 40L, 50L, 60L, 70L, -10L, -20L, -30L);
+//
+// //noinspection unchecked
+// env.fromCollection(input)
+// .groupBy(1).sum(3)
+// .groupBy(2).window(Time.of(10, new
MyTimestamp(), 0)).every(Time.of(5, new MyTimestamp(), 0)).max(3)
+// .flatten()
+// .merge(sourceStream4.filter(new
MyFilterFunction()).distribute())
+// .map(new MapFunction<Tuple5<Integer, String,
Character, Double, Boolean>, Tuple4<Integer, String,
+// Double, Boolean>>() {
+//
+// @Override
+// public Tuple4<Integer, String, Double,
Boolean> map(Tuple5<Integer, String, Character, Double,
+// Boolean> value) throws
Exception {
+// return new Tuple4<Integer,
String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2, value
+// .f3,
+// value.f4);
+// }
+// }).broadcast().flatMap(new
MyFlatMapFunction()).connect(sourceStream2).map(new
+// MyCoMapFunction()).addSink(new
ToListSink(resultList1));
+//
+// IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it =
sourceStream1.sum(0).filter(new FilterFunction
+// <Tuple2<Long, Tuple2<String, Long>>>() {
+//
+// @Override
+// public boolean filter(Tuple2<Long, Tuple2<String,
Long>> value) throws Exception {
+// return value.f0 < 20;
+// }
+// }).iterate(1000);
+//
+// SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step =
it.map(new IncrementMap()).split(new
+// MyOutputSelector());
+// it.closeWith(step.select("iterate"));
+//
+// step.select("firstOutput").print();
+// step.select("secondOutput").print();
+//
+// IterativeDataStream<Tuple3<List<Tuple2<Double, String>>,
Integer, Double>> it2 = sourceStream4.project(3, 1)
+// .types(Double.class, String
+// .class).forward().map(new
MapFunction<Tuple2<Double, String>, Tuple3<List<Tuple2<Double, String>>,
+// Integer, Double>>(){
+//
+// @Override
+// public Tuple3<List<Tuple2<Double, String>>, Integer,
Double> map(Tuple2<Double, String> value) throws
+// Exception {
+// List<Tuple2<Double, String>> list = new
ArrayList<Tuple2<Double, String>>();
+// list.add(value);
+// return new Tuple3<List<Tuple2<Double, String>>,
Integer, Double>(list, 1, value.f0);
+// }
+// }).iterate(5000);
+//
+// SplitDataStream<Tuple3<List<Tuple2<Double, String>>, Integer,
Double>> boxing =
+// it2.window(Count.of(2))
+// .mapWindow(new
WindowMapFunction<Tuple3<List<Tuple2<Double, String>>, Integer, Double>,
+// Tuple3<List<Tuple2<Double,
String>>, Integer, Double>>() {
+//
+// @Override
+// public void
mapWindow(Iterable<Tuple3<List<Tuple2<Double, String>>, Integer, Double>>
values,
+//
Collector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> out) throws
Exception {
+// List<Tuple2<Double, String>>
list = new ArrayList<Tuple2<Double, String>>();
+// int count = 0;
+// double quantity = 0.0;
+//
+// for (Tuple3<List<Tuple2<Double,
String>>, Integer, Double> value : values) {
+// list.addAll(value.f0);
+// count += value.f1;
+// quantity += value.f2;
+// }
+// out.collect(new
Tuple3<List<Tuple2<Double, String>>, Integer, Double>(list, count,
+// quantity));
+//
+// }
+// })
+//.flatten().split(new OutputSelector<Tuple3<List<Tuple2<Double, String>>,
Integer, Double>>(){
+//
+// @Override
+// public Iterable<String>
select(Tuple3<List<Tuple2<Double, String>>, Integer, Double> value) {
+// List<String> output = new
ArrayList<String>();
+// if (value.f2 < 2) {
+// output.add("iterate");
+// } else {
+// output.add("output");
+// }
+// return output;
+// }
+// });
+//
+// it2.closeWith(boxing.select("iterate"));
+//
+// boxing.select("output").print();
+// boxing.select("iterate").print();
+//
+// //sourceStream1.project(0).types(Long.class).print();
+// //sourceStream2.groupBy("f0").sum("f1").print();
+//
+// env.execute();
+//
+// System.out.println(resultList1);
+// }
+
+ @Test
+ public void complexIntegrationTest2() throws Exception {
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1,
MEMORYSIZE);
+ env.setBufferTimeout(10);
+
+ DataStream<Tuple5<Integer, String, Character, Double, Boolean>>
sourceStream1 = env.fromCollection(input);
+ DataStream<OuterPojo> sourceStream2 = env.addSource(new
PojoSource());
+
+ //noinspection unchecked
+ sourceStream1.groupBy(1).sum(3)
+ .groupBy(2).window(Time.of(10, new
MyTimestamp(), 0)).every(Time.of(4, new MyTimestamp(), 0)).max(3)
+ .flatten()
+ .merge(sourceStream1.filter(new
MyFilterFunction()).distribute())
+ .map(new MapFunction<Tuple5<Integer, String,
Character, Double, Boolean>, Tuple4<Integer, String,
+ Double, Boolean>>() {
+
+ @Override
+ public Tuple4<Integer, String, Double,
Boolean> map(Tuple5<Integer, String, Character, Double,
+ Boolean> value) throws
Exception {
+ return new Tuple4<Integer,
String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2, value
+ .f3,
+ value.f4);
+ }
+ }).broadcast().flatMap(new
MyFlatMapFunction()).connect(sourceStream2).map(new
+ MyCoMapFunction()).addSink(new
ToListSink<String>("test2"));
+
+ env.execute();
+
+ String result = "water_melon-b, water_melon-b, water_melon-b,
water_melon-b, water_melon-b, water_melon-b, " +
+ "water_melon-b, water_melon-b, water_melon-b,
water_melon-b, water_melon-b, water_melon-b, " +
+ "water_melon-b, " +
+ "water_melon-b, water_melon-b, water_melon-b,
water_melon-b, water_melon-b, water_melon-b, " +
+ "water_melon-b, " +
+ "peach-b, peach-b, peach-a, peach-a, peach-a,
peach-a, peanut-b, peanut-b, plum-a, plum-a, orange-c, " +
+ "orange-c, " +
+ "orange-c, orange-c, apple-b, apple-b";
+
+ ArrayList<String> test2Results = new
ArrayList<String>(Arrays.asList(result.split(", ")));
+
+ Collections.sort(test2Results);
+ Collections.sort(results.get("test2"));
+ assertEquals(test2Results, results.get("test2"));
+ System.out.println(results.get("test2"));
+ }
+
+ @Test
+ public void complexIntegrationTest3() throws Exception {
+ ArrayList<String> expected1 = new ArrayList<String>();
+ for (int i = 0; i < 9; i++) {
+ expected1.add("(10,(a,1))");
+ }
+ ArrayList<String> expected2 = new ArrayList<String>();
+ for (int i = 0; i < 19; i++) {
+ expected2.add("(20,(a,1))");
+ }
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1,
MEMORYSIZE);
+ env.setBufferTimeout(10);
+
+ DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream =
env.addSource(new TupleSource());
+ IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it =
sourceStream.sum(0).filter(new FilterFunction
+ <Tuple2<Long, Tuple2<String, Long>>>() {
+
+ @Override
+ public boolean filter(Tuple2<Long, Tuple2<String,
Long>> value) throws Exception {
+ return value.f0 < 20;
+ }
+ }).iterate(1000);
+
+ SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step =
it.map(new IncrementMap()).split(new
+ MyOutputSelector());
+ it.closeWith(step.select("iterate"));
+
+ step.select("firstOutput").addSink(new ToListSink<Tuple2<Long,
Tuple2<String, Long>>>("test31"));
+ step.select("secondOutput").addSink(new ToListSink<Tuple2<Long,
Tuple2<String, Long>>>("test32"));
+
+ env.execute();
+
+ assertEquals(expected1, results.get("test31"));
+ assertEquals(expected2, results.get("test32"));
+
+ }
+
+// non functioning
+// @Test
+// public void complexIntegrationTest4() throws Exception {
+// StreamExecutionEnvironment env = new TestStreamEnvironment(1,
MEMORYSIZE);
+// env.setBufferTimeout(10);
+//
+// DataStream<Tuple5<Integer, String, Character, Double, Boolean>>
sourceStream = env.fromCollection(input);
+//
+// IterativeDataStream<Tuple3<List<Tuple2<Double, String>>,
Integer, Double>> it2 = sourceStream.project(3, 1)
+// .types(Double.class, String
+// .class).forward().map(new
MapFunction<Tuple2<Double, String>, Tuple3<List<Tuple2<Double, String>>,
+// Integer, Double>>(){
+//
+// @Override
+// public Tuple3<List<Tuple2<Double, String>>, Integer,
Double> map(Tuple2<Double, String> value) throws
+// Exception {
+// List<Tuple2<Double, String>> list = new
ArrayList<Tuple2<Double, String>>();
+// list.add(value);
+// return new Tuple3<List<Tuple2<Double, String>>,
Integer, Double>(list, 1, value.f0);
+// }
+// }).iterate(5000);
+//
+// SplitDataStream<Tuple3<List<Tuple2<Double, String>>, Integer,
Double>> boxing =
+// it2.window(Count.of(2))
+// .mapWindow(new
WindowMapFunction<Tuple3<List<Tuple2<Double, String>>, Integer, Double>,
+//
Tuple3<List<Tuple2<Double, String>>, Integer, Double>>() {
+//
+// @Override
+// public void
mapWindow(Iterable<Tuple3<List<Tuple2<Double, String>>, Integer, Double>>
+// values,
+//
Collector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> out) throws
+// Exception {
+//
List<Tuple2<Double, String>> list = new ArrayList<Tuple2<Double, String>>();
+// int count = 0;
+// double quantity
= 0.0;
+//
+// for
(Tuple3<List<Tuple2<Double, String>>, Integer, Double> value : values) {
+//
list.addAll(value.f0);
+// count
+= value.f1;
+//
quantity += value.f2;
+// }
+// out.collect(new
Tuple3<List<Tuple2<Double, String>>, Integer, Double>(list, count,
+//
quantity));
+//
+// }
+// })
+// .flatten().split(new
OutputSelector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>>(){
+//
+// @Override
+// public Iterable<String>
select(Tuple3<List<Tuple2<Double, String>>, Integer, Double> value) {
+// List<String> output = new
ArrayList<String>();
+// if (value.f2 < 2) {
+// output.add("iterate");
+// } else {
+// output.add("output");
+// }
+// return output;
+// }
+// });
+//
+// it2.closeWith(boxing.select("iterate"));
+//
+// boxing.select("output").print();
+//
+// env.execute();
+// }
+
+ @Test
+ public void complexIntegrationTest5() throws Exception {
+ ArrayList<String> expected = new ArrayList<String>();
+ for (int i = 0; i < 20; i++) {
+ expected.add("((" + i / 2 + ",water_melon-b,2,pojo)," +
(i % 2 + 1) + ")");
+ expected.add("((1,a,1,tuple)," + (i + 1) + ")");
+ }
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1,
MEMORYSIZE);
+ env.setBufferTimeout(10);
+
+ DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 =
env.addSource(new TupleSource());
+ DataStream<OuterPojo> sourceStream2 = env.addSource(new
PojoSource());
+
+
sourceStream1.shuffle().connect(sourceStream2.broadcast()).map(new
StringifyCoMapFunction()).map(new
+
MapFunction<String, Tuple2<String, Short>>() {
+
+ @Override
+ public Tuple2<String, Short> map(String value) throws
Exception {
+ return new Tuple2<String, Short>(value, (short)
1);
+ }
+
+ }).groupBy(0).sum(1).addSink(new ToListSink<Tuple2<String,
Short>>("test5"));
+
+ env.execute();
+
+ Collections.sort(expected);
+ Collections.sort(results.get("test5"));
+
+ assertEquals(expected, results.get("test5"));
+ }
+
+ @Test
+ public void complexIntegrationTest6() throws Exception {
+ //heavy traffic with maps and flatmaps
+
+ ArrayList<String> expected1 = new ArrayList<String>();
+ expected1.add("541");
+ expected1.add("1223");
+ expected1.add("1987");
+ expected1.add("2741");
+ expected1.add("10939");
+ expected1.add("3571");
+ expected1.add("4409");
+ expected1.add("11927");
+ expected1.add("5279");
+ expected1.add("6133");
+ expected1.add("12823");
+ expected1.add("6997");
+ expected1.add("7919");
+ expected1.add("13763");
+ expected1.add("8831");
+ expected1.add("9733");
+ expected1.add("14759");
+ expected1.add("9973");
+ expected1.add("15671");
+ expected1.add("16673");
+ expected1.add("17659");
+ expected1.add("18617");
+ expected1.add("19697");
+ expected1.add("19997");
+
+ ArrayList<String> expected2 = new ArrayList<String>();
+ for (int i = 2; i < 100; i++) {
+ expected2.add("(" + i + "," + 20000 / i + ")");
+ }
+ for (int i = 19901; i <= 20000; i++) {
+ expected2.add("(" + i + "," + 20000 / i + ")");
+ }
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1,
MEMORYSIZE);
+ env.setBufferTimeout(1);
+
+ DataStream<Long> sourceStream1 = env.generateSequence(1, 10000);
+ DataStream<Long> sourceStream2 = env.generateSequence(10001,
20000);
+
+ //noinspection unchecked
+ sourceStream1.filter(new PrimeFilterFunction())
+ .window(Count.of(100))
+ .max(0).flatten()
+ .merge(sourceStream2.filter(new
PrimeFilterFunction())
+ .window(Count.of(100))
+ .max(0).flatten())
+ .addSink(new ToListSink<Long>("test61"));
+ sourceStream1.flatMap(new DivisorsFlatMapFunction())
+ .merge(sourceStream2.flatMap(new
DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
+ Integer>>() {
+
+ @Override
+ public Tuple2<Long, Integer> map(Long value) throws
Exception {
+ return new Tuple2<Long, Integer>(value, 1);
+ }
+ })
+ .groupBy(0)
+ .sum(1)
+ .groupBy(0)
+ .window(Count.of(10000)).max(1).flatten()
+ .filter(new FilterFunction<Tuple2<Long,
Integer>>() {
+
+ @Override
+ public boolean filter(Tuple2<Long,
Integer> value) throws Exception {
+ return value.f0 < 100 ||
value.f0 > 19900;
+ }
+ })
+ .addSink(new ToListSink<Tuple2<Long,
Integer>>("test62"));
+
+ env.execute();
+
+ Collections.sort(expected1);
+ Collections.sort(expected2);
+ Collections.sort(results.get("test61"));
+ Collections.sort(results.get("test62"));
+
+ assertEquals(expected1, results.get("test61"));
+ assertEquals(expected2, results.get("test62"));
+
+ }
+
+ @Test
+ public void complexIntegrationTest7() throws Exception {
+
+ ArrayList<String> expected = new ArrayList<String>();
+ expected.add("((100,100),0)");
+ expected.add("((120,122),5)");
+ expected.add("((121,125),6)");
+ expected.add("((138,144),9)");
+ expected.add("((139,147),10)");
+ expected.add("((156,166),13)");
+ expected.add("((157,169),14)");
+ expected.add("((174,188),17)");
+ expected.add("((175,191),18)");
+ expected.add("((192,210),21)");
+ expected.add("((193,213),22)");
+ expected.add("((210,232),25)");
+ expected.add("((211,235),26)");
+ expected.add("((228,254),29)");
+ expected.add("((229,257),30)");
+ expected.add("((246,276),33)");
+ expected.add("((247,279),34)");
+ expected.add("((264,298),37)");
+ expected.add("((265,301),38)");
+ expected.add("((282,320),41)");
+ expected.add("((283,323),42)");
+ expected.add("((300,342),45)");
+ expected.add("((301,345),46)");
+ expected.add("((318,364),49)");
+ expected.add("((319,367),50)");
+ expected.add("((336,386),53)");
+ expected.add("((337,389),54)");
+ expected.add("((354,408),57)");
+ expected.add("((355,411),58)");
+ expected.add("((372,430),61)");
+ expected.add("((373,433),62)");
+ expected.add("((390,452),65)");
+ expected.add("((391,455),66)");
+ expected.add("((408,474),69)");
+ expected.add("((409,477),70)");
+ expected.add("((426,496),73)");
+ expected.add("((427,499),74)");
+ expected.add("((444,518),77)");
+ expected.add("((445,521),78)");
+ expected.add("((462,540),81)");
+ expected.add("((463,543),82)");
+ expected.add("((480,562),85)");
+ expected.add("((481,565),86)");
+ expected.add("((498,584),89)");
+ expected.add("((499,587),90)");
+ expected.add("((516,606),93)");
+ expected.add("((517,609),94)");
+ expected.add("((534,628),97)");
+ expected.add("((535,631),98)");
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1,
32);
+ env.setBufferTimeout(100);
+
+ env.addSource(new RectangleSource()).global()
+ .map(new RectangleMapFunction())
+ .window(Delta.of(0.0, new
DeltaFunction<Tuple2<RectangleClass, Integer>>() {
+
+ @Override
+ public double
getDelta(Tuple2<RectangleClass, Integer> oldDataPoint, Tuple2<RectangleClass,
+ Integer> newDataPoint) {
+ return (newDataPoint.f0.b -
newDataPoint.f0.a) - (oldDataPoint.f0.b - oldDataPoint.f0.a);
+ }
+ }, new Tuple2<RectangleClass, Integer>(new
RectangleClass(100, 100), 0)))
+ .mapWindow(new MyWindowMapFunction())
+ .flatten()
+ .addSink(new ToListSink<Tuple2<RectangleClass,
Integer>>("test7"));
+
+ env.execute();
+
+ Collections.sort(expected);
+ Collections.sort(results.get("test7"));
+ assertEquals(expected, results.get("test7"));
+ }
+
+ @Test
+ public void complexIntegrationTest8() throws Exception {
+
+ ArrayList<String> expected = new ArrayList<String>();
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(1,
32);
+ env.setBufferTimeout(1);
+
+ SplitDataStream<Long> splittedStream1 = env.generateSequence(1,
10)
+ .map(new MapFunction<Long, Long>() {
+
+ @Override
+ public Long map(Long value) throws
Exception {
+ return value;
+ }
+ })
+ .filter(new FilterFunction<Long>() {
+
+ @Override
+ public boolean filter(Long value)
throws Exception {
+ return true;
+ }
+ })
+ .flatMap(new SquareFlatMapFunction())
+ .split(new OutputSelector<Long>() {
+
+ @Override
+ public Iterable<String> select(Long
value) {
+ ArrayList<String> output = new
ArrayList<String>();
+ if(value < 8) {
+ output.add("first");
+ } else {
+ output.add("second");
+ }
+ return output;
+ }
+ });
+
+ //splittedStream1.select("second").print();
+
+ SplitDataStream<Long> splittedStream2 =
splittedStream1.select("first").split(new OutputSelector<Long>() {
+
+ @Override
+ public Iterable<String> select(Long value) {
+ ArrayList<String> output = new
ArrayList<String>();
+ if(value < 5) {
+ output.add("third");
+ } else {
+ output.add("fourth");
+ }
+ return output;
+ }
+ });
+
+ DataStream<Long> dataStream3 =
splittedStream1.select("second").map(new MapFunction<Long, Long>() {
+
+ @Override
+ public Long map(Long value) throws Exception {
+ return value;
+ }
+ });
+
+ dataStream3.print();
+ splittedStream2.select("third").print();
+ // BUG!!!
+ dataStream3.merge(splittedStream2.select("third")).print();
+
+ //splittedStream2.select("fourth").print();
+
+ //.addSink(new ToListSink<Long>("test8"));
+
+ env.execute();
+
+ //Collections.sort(expected);
+ //Collections.sort(results.get("test8"));
+ //assertEquals(expected, results.get("test8"));
+ }
+
+ public static class InnerPojo {
+ public Long f0;
--- End diff --
Flink should also support stuff like java collections, date-types etc.
We frequently see users on the ML using that stuff. If its easy to
integrate into the test, I would suggest using such a type as well.
> Add a complex integration test for Streaming API
> ------------------------------------------------
>
> Key: FLINK-1595
> URL: https://issues.apache.org/jira/browse/FLINK-1595
> Project: Flink
> Issue Type: Task
> Components: Streaming
> Reporter: Gyula Fora
> Assignee: Péter Szabó
> Labels: Starter
>
> The streaming tests currently lack a sophisticated integration test that
> would test many api features at once.
> This should include different merging, partitioning, grouping, aggregation
> types, as well as windowing and connected operators.
> The results should be tested for correctness.
> A test like this would help identifying bugs that are hard to detect by
> unit-tests.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)