This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d4386e6 [FLINK-19640] Add an IT case for bounded execution
d4386e6 is described below
commit d4386e6ad211153c910bf55750cf13a23073b1cd
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Oct 16 09:56:21 2020 +0200
[FLINK-19640] Add an IT case for bounded execution
This closes #13666.
---
.../runtime/SortingBoundedInputITCase.java | 381 ++++++++++++++++-----
1 file changed, 300 insertions(+), 81 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.java
index 7246418..262780a 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.java
@@ -18,19 +18,30 @@
package org.apache.flink.test.streaming.runtime;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
@@ -43,45 +54,47 @@ import
org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
-import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
-import
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
-import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import
org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.util.SplittableIterator;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
-import java.util.UUID;
import java.util.function.Consumer;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
/**
* An end to end test for sorted inputs for a keyed operator with bounded
inputs.
*/
public class SortingBoundedInputITCase extends AbstractTestBase {
+
@Test
- public void testOneInputOperator() throws Exception {
+ public void testOneInputOperator() {
long numberOfRecords = 1_000_000;
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
+ env.configure(config, this.getClass().getClassLoader());
+
DataStreamSource<Tuple2<Integer, byte[]>> elements =
env.fromParallelCollection(
new InputGenerator(numberOfRecords),
new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
@@ -95,21 +108,7 @@ public class SortingBoundedInputITCase extends
AbstractTestBase {
new AssertingOperator()
);
- // TODO we should replace this block with
DataStreamUtils#collect once
- // we have the automatic runtime mode determination in place.
- CollectResultIterator<Long> collectedCounts = applyCollect(env,
counts);
- StreamGraph streamGraph = env.getStreamGraph();
- streamGraph.getStreamNode(counts.getId()).setSortedInputs(true);
- Map<ManagedMemoryUseCase, Integer> operatorMemory = new
HashMap<>();
- operatorMemory.put(ManagedMemoryUseCase.BATCH_OP, 1);
-
streamGraph.getStreamNode(counts.getId()).setManagedMemoryUseCaseWeights(
- operatorMemory,
- Collections.emptySet()
- );
- JobClient jobClient = env.executeAsync(streamGraph);
- collectedCounts.setJobClient(jobClient);
-
- long sum = CollectionUtil.iteratorToList(collectedCounts)
+ long sum =
CollectionUtil.iteratorToList(DataStreamUtils.collect(counts))
.stream()
.mapToLong(l -> l)
.sum();
@@ -118,9 +117,14 @@ public class SortingBoundedInputITCase extends
AbstractTestBase {
}
@Test
- public void testTwoInputOperator() throws Exception {
+ public void testTwoInputOperator() {
long numberOfRecords = 500_000;
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
+ env.configure(config, this.getClass().getClassLoader());
+
DataStreamSource<Tuple2<Integer, byte[]>> elements1 =
env.fromParallelCollection(
new InputGenerator(numberOfRecords),
new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
@@ -141,21 +145,7 @@ public class SortingBoundedInputITCase extends
AbstractTestBase {
new AssertingTwoInputOperator()
);
- // TODO we should replace this block with
DataStreamUtils#collect once
- // we have the automatic runtime mode determination in place.
- CollectResultIterator<Long> collectedCounts = applyCollect(env,
counts);
- StreamGraph streamGraph = env.getStreamGraph();
- streamGraph.getStreamNode(counts.getId()).setSortedInputs(true);
- Map<ManagedMemoryUseCase, Integer> operatorMemory = new
HashMap<>();
- operatorMemory.put(ManagedMemoryUseCase.BATCH_OP, 1);
-
streamGraph.getStreamNode(counts.getId()).setManagedMemoryUseCaseWeights(
- operatorMemory,
- Collections.emptySet()
- );
- JobClient jobClient = env.executeAsync(streamGraph);
- collectedCounts.setJobClient(jobClient);
-
- long sum = CollectionUtil.iteratorToList(collectedCounts)
+ long sum =
CollectionUtil.iteratorToList(DataStreamUtils.collect(counts))
.stream()
.mapToLong(l -> l)
.sum();
@@ -164,9 +154,14 @@ public class SortingBoundedInputITCase extends
AbstractTestBase {
}
@Test
- public void testThreeInputOperator() throws Exception {
+ public void testThreeInputOperator() {
long numberOfRecords = 500_000;
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
+ env.configure(config, this.getClass().getClassLoader());
+
KeyedStream<Tuple2<Integer, byte[]>, Object> elements1 =
env.fromParallelCollection(
new InputGenerator(numberOfRecords),
new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
@@ -196,21 +191,7 @@ public class SortingBoundedInputITCase extends
AbstractTestBase {
env.addOperator(assertingTransformation);
DataStream<Long> counts = new DataStream<>(env,
assertingTransformation);
- // TODO we should replace this block with
DataStreamUtils#collect once
- // we have the automatic runtime mode determination in place.
- CollectResultIterator<Long> collectedCounts = applyCollect(env,
counts);
- StreamGraph streamGraph = env.getStreamGraph();
-
streamGraph.getStreamNode(assertingTransformation.getId()).setSortedInputs(true);
- Map<ManagedMemoryUseCase, Integer> operatorMemory = new
HashMap<>();
- operatorMemory.put(ManagedMemoryUseCase.BATCH_OP, 1);
-
streamGraph.getStreamNode(counts.getId()).setManagedMemoryUseCaseWeights(
- operatorMemory,
- Collections.emptySet()
- );
- JobClient jobClient = env.executeAsync(streamGraph);
- collectedCounts.setJobClient(jobClient);
-
- long sum = CollectionUtil.iteratorToList(collectedCounts)
+ long sum =
CollectionUtil.iteratorToList(DataStreamUtils.collect(counts))
.stream()
.mapToLong(l -> l)
.sum();
@@ -218,24 +199,262 @@ public class SortingBoundedInputITCase extends
AbstractTestBase {
assertThat(sum, equalTo(numberOfRecords * 3));
}
- private CollectResultIterator<Long>
applyCollect(StreamExecutionEnvironment env, DataStream<Long> counts) {
- String accumulatorName = "dataStreamCollect_" +
UUID.randomUUID().toString();
-
- CollectSinkOperatorFactory<Long> factory = new
CollectSinkOperatorFactory<>(
- new LongSerializer(),
- accumulatorName);
- CollectSinkOperator<Long> operator =
(CollectSinkOperator<Long>) factory.getOperator();
- CollectStreamSink<Long> sink = new CollectStreamSink<>(counts,
factory);
- sink.name("Data stream collect sink");
- env.addOperator(sink.getTransformation());
-
- return new CollectResultIterator<>(
- operator.getOperatorIdFuture(),
- new LongSerializer(),
- accumulatorName,
- env.getCheckpointConfig());
+ @Test
+ public void testBatchExecutionWithTimersOneInput() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1); // set parallelism to 1 to have
consistent order of results
+
+ Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
+ env.configure(config, this.getClass().getClassLoader());
+
+ WatermarkStrategy<Tuple2<Integer, Integer>> watermarkStrategy =
+ WatermarkStrategy.forGenerator(
+ ctx -> GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP
+ ).withTimestampAssigner(
+ (r, previousTimestamp) -> r.f1
+ );
+ SingleOutputStreamOperator<Tuple2<Integer, Integer>> elements =
env.fromElements(
+ Tuple2.of(1, 3),
+ Tuple2.of(1, 1),
+ Tuple2.of(2, 1),
+ Tuple2.of(1, 4),
+ // emit watermark = 5
+ Tuple2.of(2, 3), // late element
+ Tuple2.of(1, 2), // late element
+ Tuple2.of(1, 13),
+ Tuple2.of(1, 11),
+ Tuple2.of(2, 14),
+ // emit watermark = 15
+ Tuple2.of(1, 11) // late element
+ ).assignTimestampsAndWatermarks(watermarkStrategy);
+
+ OutputTag<Integer> lateElements = new
OutputTag<>("late_elements", BasicTypeInfo.INT_TYPE_INFO);
+ SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> sums
= elements
+ .map(element -> element.f0)
+ .keyBy(element -> element)
+ .process(new KeyedProcessFunction<Integer, Integer,
Tuple3<Long, Integer, Integer>>() {
+
+ private MapState<Long, Integer> countState;
+ private ValueState<Long> previousTimestampState;
+
+ @Override
+ public void open(Configuration parameters) {
+ countState =
getRuntimeContext().getMapState(
+ new MapStateDescriptor<>(
+ "sum",
+
BasicTypeInfo.LONG_TYPE_INFO,
+
BasicTypeInfo.INT_TYPE_INFO)
+ );
+ previousTimestampState =
getRuntimeContext().getState(
+ new ValueStateDescriptor<>(
+ "previousTimestamp",
+
BasicTypeInfo.LONG_TYPE_INFO
+ )
+ );
+ }
+
+ @Override
+ public void processElement(
+ Integer value,
+ Context ctx,
+ Collector<Tuple3<Long, Integer,
Integer>> out) throws Exception {
+
+ Long elementTimestamp = ctx.timestamp();
+ long nextTen = ((elementTimestamp + 10)
/ 10) * 10;
+
ctx.timerService().registerEventTimeTimer(nextTen);
+
+ if (elementTimestamp <
ctx.timerService().currentWatermark()) {
+ ctx.output(lateElements, value);
+ } else {
+ Long previousTimestamp =
Optional.ofNullable(previousTimestampState.value()).orElse(0L);
+ assertThat(elementTimestamp,
greaterThanOrEqualTo(previousTimestamp));
+
previousTimestampState.update(elementTimestamp);
+
+ Integer currentCount =
Optional.ofNullable(countState.get(nextTen)).orElse(0);
+ countState.put(nextTen,
currentCount + 1);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<Tuple3<Long, Integer,
Integer>> out) throws Exception {
+ out.collect(Tuple3.of(timestamp,
ctx.getCurrentKey(), countState.get(timestamp)));
+ countState.remove(timestamp);
+
+ // this would go in infinite loop if we
did not quiesce the timer service.
+
ctx.timerService().registerEventTimeTimer(timestamp + 1);
+ }
+ });
+
+ DataStream<Integer> lateStream =
sums.getSideOutput(lateElements);
+ List<Integer> lateRecordsCollected =
CollectionUtil.iteratorToList(DataStreamUtils.collect(lateStream));
+ List<Tuple3<Long, Integer, Integer>> sumsCollected =
CollectionUtil.iteratorToList(DataStreamUtils.collect(sums));
+
+ assertTrue(lateRecordsCollected.isEmpty());
+ assertThat(sumsCollected, equalTo(
+ Arrays.asList(
+ Tuple3.of(10L, 1, 4),
+ Tuple3.of(20L, 1, 3),
+ Tuple3.of(10L, 2, 2),
+ Tuple3.of(20L, 2, 1)
+ )
+ ));
+ }
+
+ @Test
+ public void testBatchExecutionWithTimersTwoInput() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1); // set parallelism to 1 to have
consistent order of results
+
+ Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
+ env.configure(config, this.getClass().getClassLoader());
+
+ WatermarkStrategy<Tuple2<Integer, Integer>> watermarkStrategy =
+ WatermarkStrategy.forGenerator(
+ ctx -> GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP
+ ).withTimestampAssigner(
+ (r, previousTimestamp) -> r.f1
+ );
+ SingleOutputStreamOperator<Integer> elements1 =
env.fromElements(
+ Tuple2.of(1, 3),
+ Tuple2.of(1, 1),
+ Tuple2.of(2, 1),
+ Tuple2.of(1, 4),
+ // emit watermark = 5
+ Tuple2.of(2, 3), // late element
+ Tuple2.of(1, 2), // late element
+ Tuple2.of(1, 13),
+ Tuple2.of(1, 11),
+ Tuple2.of(2, 14),
+ // emit watermark = 15
+ Tuple2.of(1, 11) // late element
+ ).assignTimestampsAndWatermarks(watermarkStrategy).map(element
-> element.f0);
+
+ SingleOutputStreamOperator<Integer> elements2 =
env.fromElements(
+ Tuple2.of(1, 3),
+ Tuple2.of(1, 1),
+ Tuple2.of(2, 1),
+ Tuple2.of(1, 4),
+ // emit watermark = 5
+ Tuple2.of(2, 3), // late element
+ Tuple2.of(1, 2), // late element
+ Tuple2.of(1, 13),
+ Tuple2.of(1, 11),
+ Tuple2.of(2, 14),
+ // emit watermark = 15
+ Tuple2.of(1, 11) // late element
+ ).assignTimestampsAndWatermarks(watermarkStrategy).map(element
-> element.f0);
+
+ OutputTag<Integer> lateElements = new
OutputTag<>("late_elements", BasicTypeInfo.INT_TYPE_INFO);
+ SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> sums
= elements1.connect(elements2)
+ .keyBy(element -> element, element -> element)
+ .process(new KeyedCoProcessFunction<Integer, Integer,
Integer, Tuple3<Long, Integer, Integer>>() {
+
+ private MapState<Long, Integer> countState;
+ private ValueState<Long> previousTimestampState;
+
+ @Override
+ public void open(Configuration parameters) {
+ countState =
getRuntimeContext().getMapState(
+ new MapStateDescriptor<>(
+ "sum",
+
BasicTypeInfo.LONG_TYPE_INFO,
+
BasicTypeInfo.INT_TYPE_INFO)
+ );
+ previousTimestampState =
getRuntimeContext().getState(
+ new ValueStateDescriptor<>(
+ "previousTimestamp",
+
BasicTypeInfo.LONG_TYPE_INFO
+ )
+ );
+ }
+
+ @Override
+ public void processElement1(
+ Integer value,
+ Context ctx,
+ Collector<Tuple3<Long, Integer,
Integer>> out) throws Exception {
+ processElement(value, ctx);
+ }
+
+ @Override
+ public void processElement2(
+ Integer value,
+ Context ctx,
+ Collector<Tuple3<Long, Integer,
Integer>> out) throws Exception {
+ processElement(value, ctx);
+ }
+
+ private void processElement(
+ Integer value,
+ Context ctx) throws Exception {
+ Long elementTimestamp = ctx.timestamp();
+ long nextTen = ((elementTimestamp + 10)
/ 10) * 10;
+
ctx.timerService().registerEventTimeTimer(nextTen);
+ if (elementTimestamp <
ctx.timerService().currentWatermark()) {
+ ctx.output(lateElements, value);
+ } else {
+ Long previousTimestamp =
Optional.ofNullable(previousTimestampState.value()).orElse(0L);
+ assertThat(elementTimestamp,
greaterThanOrEqualTo(previousTimestamp));
+
previousTimestampState.update(elementTimestamp);
+
+ Integer currentCount =
Optional.ofNullable(countState.get(nextTen)).orElse(0);
+ countState.put(nextTen,
currentCount + 1);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<Tuple3<Long, Integer,
Integer>> out) throws Exception {
+ out.collect(Tuple3.of(timestamp,
ctx.getCurrentKey(), countState.get(timestamp)));
+ countState.remove(timestamp);
+
+ // this would go in infinite loop if we
did not quiesce the timer service.
+
ctx.timerService().registerEventTimeTimer(timestamp + 1);
+ }
+ });
+
+ DataStream<Integer> lateStream =
sums.getSideOutput(lateElements);
+ List<Integer> lateRecordsCollected =
CollectionUtil.iteratorToList(DataStreamUtils.collect(lateStream));
+ List<Tuple3<Long, Integer, Integer>> sumsCollected =
CollectionUtil.iteratorToList(DataStreamUtils.collect(sums));
+
+ assertTrue(lateRecordsCollected.isEmpty());
+ assertThat(sumsCollected, equalTo(
+ Arrays.asList(
+ Tuple3.of(10L, 1, 8),
+ Tuple3.of(20L, 1, 6),
+ Tuple3.of(10L, 2, 4),
+ Tuple3.of(20L, 2, 2)
+ )
+ ));
}
+ private static final WatermarkGenerator<Tuple2<Integer, Integer>>
GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP =
+ new WatermarkGenerator<Tuple2<Integer, Integer>>() {
+ @Override
+ public void onEvent(
+ Tuple2<Integer, Integer> event,
+ long eventTimestamp,
+ WatermarkOutput output) {
+ if (eventTimestamp == 4) {
+ output.emitWatermark(new Watermark(5));
+ } else if (eventTimestamp == 14) {
+ output.emitWatermark(new Watermark(15));
+ }
+ }
+
+ @Override
+ public void onPeriodicEmit(WatermarkOutput output) {
+
+ }
+ };
+
private static class AssertingOperator extends
AbstractStreamOperator<Long>
implements OneInputStreamOperator<Tuple2<Integer,
byte[]>, Long>, BoundedOneInput {
private final Set<Integer> seenKeys = new HashSet<>();
@@ -333,7 +552,7 @@ public class SortingBoundedInputITCase extends
AbstractTestBase {
}
@Override
- public void endInput(int inputId) throws Exception {
+ public void endInput(int inputId) {
if (inputId == 1) {
input1Finished = true;
}
@@ -398,17 +617,17 @@ public class SortingBoundedInputITCase extends
AbstractTestBase {
}
@Override
- public void processWatermark(Watermark mark) throws Exception {
+ public void
processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
}
@Override
- public void processLatencyMarker(LatencyMarker latencyMarker)
throws Exception {
+ public void processLatencyMarker(LatencyMarker latencyMarker) {
}
@Override
- public void setKeyContextElement(StreamRecord<Tuple2<Integer,
byte[]>> record) throws Exception {
+ public void setKeyContextElement(StreamRecord<Tuple2<Integer,
byte[]>> record) {
}
}