This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d4386e6  [FLINK-19640] Add an IT case for bounded execution
d4386e6 is described below

commit d4386e6ad211153c910bf55750cf13a23073b1cd
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Oct 16 09:56:21 2020 +0200

    [FLINK-19640] Add an IT case for bounded execution
    
    This closes #13666.
---
 .../runtime/SortingBoundedInputITCase.java         | 381 ++++++++++++++++-----
 1 file changed, 300 insertions(+), 81 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.java
index 7246418..262780a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SortingBoundedInputITCase.java
@@ -18,19 +18,30 @@
 
 package org.apache.flink.test.streaming.runtime;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
@@ -43,45 +54,47 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
-import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
-import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
-import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import 
org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.SplittableIterator;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
-import java.util.UUID;
 import java.util.function.Consumer;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * An end to end test for sorted inputs for a keyed operator with bounded 
inputs.
  */
 public class SortingBoundedInputITCase extends AbstractTestBase {
+
        @Test
-       public void testOneInputOperator() throws Exception {
+       public void testOneInputOperator() {
                long numberOfRecords = 1_000_000;
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               Configuration config = new Configuration();
+               config.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+               env.configure(config, this.getClass().getClassLoader());
+
                DataStreamSource<Tuple2<Integer, byte[]>> elements = 
env.fromParallelCollection(
                        new InputGenerator(numberOfRecords),
                        new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
@@ -95,21 +108,7 @@ public class SortingBoundedInputITCase extends 
AbstractTestBase {
                                new AssertingOperator()
                        );
 
-               // TODO we should replace this block with 
DataStreamUtils#collect once
-               // we have the automatic runtime mode determination in place.
-               CollectResultIterator<Long> collectedCounts = applyCollect(env, 
counts);
-               StreamGraph streamGraph = env.getStreamGraph();
-               streamGraph.getStreamNode(counts.getId()).setSortedInputs(true);
-               Map<ManagedMemoryUseCase, Integer> operatorMemory = new 
HashMap<>();
-               operatorMemory.put(ManagedMemoryUseCase.BATCH_OP, 1);
-               
streamGraph.getStreamNode(counts.getId()).setManagedMemoryUseCaseWeights(
-                       operatorMemory,
-                       Collections.emptySet()
-               );
-               JobClient jobClient = env.executeAsync(streamGraph);
-               collectedCounts.setJobClient(jobClient);
-
-               long sum = CollectionUtil.iteratorToList(collectedCounts)
+               long sum = 
CollectionUtil.iteratorToList(DataStreamUtils.collect(counts))
                        .stream()
                        .mapToLong(l -> l)
                        .sum();
@@ -118,9 +117,14 @@ public class SortingBoundedInputITCase extends 
AbstractTestBase {
        }
 
        @Test
-       public void testTwoInputOperator() throws Exception {
+       public void testTwoInputOperator() {
                long numberOfRecords = 500_000;
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               Configuration config = new Configuration();
+               config.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+               env.configure(config, this.getClass().getClassLoader());
+
                DataStreamSource<Tuple2<Integer, byte[]>> elements1 = 
env.fromParallelCollection(
                        new InputGenerator(numberOfRecords),
                        new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
@@ -141,21 +145,7 @@ public class SortingBoundedInputITCase extends 
AbstractTestBase {
                                new AssertingTwoInputOperator()
                        );
 
-               // TODO we should replace this block with 
DataStreamUtils#collect once
-               // we have the automatic runtime mode determination in place.
-               CollectResultIterator<Long> collectedCounts = applyCollect(env, 
counts);
-               StreamGraph streamGraph = env.getStreamGraph();
-               streamGraph.getStreamNode(counts.getId()).setSortedInputs(true);
-               Map<ManagedMemoryUseCase, Integer> operatorMemory = new 
HashMap<>();
-               operatorMemory.put(ManagedMemoryUseCase.BATCH_OP, 1);
-               
streamGraph.getStreamNode(counts.getId()).setManagedMemoryUseCaseWeights(
-                       operatorMemory,
-                       Collections.emptySet()
-               );
-               JobClient jobClient = env.executeAsync(streamGraph);
-               collectedCounts.setJobClient(jobClient);
-
-               long sum = CollectionUtil.iteratorToList(collectedCounts)
+               long sum = 
CollectionUtil.iteratorToList(DataStreamUtils.collect(counts))
                        .stream()
                        .mapToLong(l -> l)
                        .sum();
@@ -164,9 +154,14 @@ public class SortingBoundedInputITCase extends 
AbstractTestBase {
        }
 
        @Test
-       public void testThreeInputOperator() throws Exception {
+       public void testThreeInputOperator() {
                long numberOfRecords = 500_000;
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               Configuration config = new Configuration();
+               config.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+               env.configure(config, this.getClass().getClassLoader());
+
                KeyedStream<Tuple2<Integer, byte[]>, Object> elements1 = 
env.fromParallelCollection(
                        new InputGenerator(numberOfRecords),
                        new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
@@ -196,21 +191,7 @@ public class SortingBoundedInputITCase extends 
AbstractTestBase {
                env.addOperator(assertingTransformation);
                DataStream<Long> counts = new DataStream<>(env, 
assertingTransformation);
 
-               // TODO we should replace this block with 
DataStreamUtils#collect once
-               // we have the automatic runtime mode determination in place.
-               CollectResultIterator<Long> collectedCounts = applyCollect(env, 
counts);
-               StreamGraph streamGraph = env.getStreamGraph();
-               
streamGraph.getStreamNode(assertingTransformation.getId()).setSortedInputs(true);
-               Map<ManagedMemoryUseCase, Integer> operatorMemory = new 
HashMap<>();
-               operatorMemory.put(ManagedMemoryUseCase.BATCH_OP, 1);
-               
streamGraph.getStreamNode(counts.getId()).setManagedMemoryUseCaseWeights(
-                       operatorMemory,
-                       Collections.emptySet()
-               );
-               JobClient jobClient = env.executeAsync(streamGraph);
-               collectedCounts.setJobClient(jobClient);
-
-               long sum = CollectionUtil.iteratorToList(collectedCounts)
+               long sum = 
CollectionUtil.iteratorToList(DataStreamUtils.collect(counts))
                        .stream()
                        .mapToLong(l -> l)
                        .sum();
@@ -218,24 +199,262 @@ public class SortingBoundedInputITCase extends 
AbstractTestBase {
                assertThat(sum, equalTo(numberOfRecords * 3));
        }
 
-       private CollectResultIterator<Long> 
applyCollect(StreamExecutionEnvironment env, DataStream<Long> counts) {
-               String accumulatorName = "dataStreamCollect_" + 
UUID.randomUUID().toString();
-
-               CollectSinkOperatorFactory<Long> factory = new 
CollectSinkOperatorFactory<>(
-                       new LongSerializer(),
-                       accumulatorName);
-               CollectSinkOperator<Long> operator = 
(CollectSinkOperator<Long>) factory.getOperator();
-               CollectStreamSink<Long> sink = new CollectStreamSink<>(counts, 
factory);
-               sink.name("Data stream collect sink");
-               env.addOperator(sink.getTransformation());
-
-               return new CollectResultIterator<>(
-                       operator.getOperatorIdFuture(),
-                       new LongSerializer(),
-                       accumulatorName,
-                       env.getCheckpointConfig());
+       @Test
+       public void testBatchExecutionWithTimersOneInput() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1); // set parallelism to 1 to have 
consistent order of results
+
+               Configuration config = new Configuration();
+               config.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+               env.configure(config, this.getClass().getClassLoader());
+
+               WatermarkStrategy<Tuple2<Integer, Integer>> watermarkStrategy =
+                       WatermarkStrategy.forGenerator(
+                               ctx -> GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP
+                       ).withTimestampAssigner(
+                               (r, previousTimestamp) -> r.f1
+                       );
+               SingleOutputStreamOperator<Tuple2<Integer, Integer>> elements = 
env.fromElements(
+                       Tuple2.of(1, 3),
+                       Tuple2.of(1, 1),
+                       Tuple2.of(2, 1),
+                       Tuple2.of(1, 4),
+                       // emit watermark = 5
+                       Tuple2.of(2, 3), // late element
+                       Tuple2.of(1, 2), // late element
+                       Tuple2.of(1, 13),
+                       Tuple2.of(1, 11),
+                       Tuple2.of(2, 14),
+                       // emit watermark = 15
+                       Tuple2.of(1, 11) // late element
+               ).assignTimestampsAndWatermarks(watermarkStrategy);
+
+               OutputTag<Integer> lateElements = new 
OutputTag<>("late_elements", BasicTypeInfo.INT_TYPE_INFO);
+               SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> sums 
= elements
+                       .map(element -> element.f0)
+                       .keyBy(element -> element)
+                       .process(new KeyedProcessFunction<Integer, Integer, 
Tuple3<Long, Integer, Integer>>() {
+
+                               private MapState<Long, Integer> countState;
+                               private ValueState<Long> previousTimestampState;
+
+                               @Override
+                               public void open(Configuration parameters) {
+                                       countState = 
getRuntimeContext().getMapState(
+                                               new MapStateDescriptor<>(
+                                                       "sum",
+                                                       
BasicTypeInfo.LONG_TYPE_INFO,
+                                                       
BasicTypeInfo.INT_TYPE_INFO)
+                                       );
+                                       previousTimestampState = 
getRuntimeContext().getState(
+                                               new ValueStateDescriptor<>(
+                                                       "previousTimestamp",
+                                                       
BasicTypeInfo.LONG_TYPE_INFO
+                                               )
+                                       );
+                               }
+
+                               @Override
+                               public void processElement(
+                                               Integer value,
+                                               Context ctx,
+                                               Collector<Tuple3<Long, Integer, 
Integer>> out) throws Exception {
+
+                                       Long elementTimestamp = ctx.timestamp();
+                                       long nextTen = ((elementTimestamp + 10) 
/ 10) * 10;
+                                       
ctx.timerService().registerEventTimeTimer(nextTen);
+
+                                       if (elementTimestamp < 
ctx.timerService().currentWatermark()) {
+                                               ctx.output(lateElements, value);
+                                       } else {
+                                               Long previousTimestamp = 
Optional.ofNullable(previousTimestampState.value()).orElse(0L);
+                                               assertThat(elementTimestamp, 
greaterThanOrEqualTo(previousTimestamp));
+                                               
previousTimestampState.update(elementTimestamp);
+
+                                               Integer currentCount = 
Optional.ofNullable(countState.get(nextTen)).orElse(0);
+                                               countState.put(nextTen, 
currentCount + 1);
+                                       }
+                               }
+
+                               @Override
+                               public void onTimer(
+                                               long timestamp,
+                                               OnTimerContext ctx,
+                                               Collector<Tuple3<Long, Integer, 
Integer>> out) throws Exception {
+                                       out.collect(Tuple3.of(timestamp, 
ctx.getCurrentKey(), countState.get(timestamp)));
+                                       countState.remove(timestamp);
+
+                                       // this would go in infinite loop if we 
did not quiesce the timer service.
+                                       
ctx.timerService().registerEventTimeTimer(timestamp + 1);
+                               }
+                       });
+
+               DataStream<Integer> lateStream = 
sums.getSideOutput(lateElements);
+               List<Integer> lateRecordsCollected = 
CollectionUtil.iteratorToList(DataStreamUtils.collect(lateStream));
+               List<Tuple3<Long, Integer, Integer>> sumsCollected = 
CollectionUtil.iteratorToList(DataStreamUtils.collect(sums));
+
+               assertTrue(lateRecordsCollected.isEmpty());
+               assertThat(sumsCollected, equalTo(
+                       Arrays.asList(
+                               Tuple3.of(10L, 1, 4),
+                               Tuple3.of(20L, 1, 3),
+                               Tuple3.of(10L, 2, 2),
+                               Tuple3.of(20L, 2, 1)
+                       )
+               ));
+       }
+
+       @Test
+       public void testBatchExecutionWithTimersTwoInput() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1); // set parallelism to 1 to have 
consistent order of results
+
+               Configuration config = new Configuration();
+               config.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+               env.configure(config, this.getClass().getClassLoader());
+
+               WatermarkStrategy<Tuple2<Integer, Integer>> watermarkStrategy =
+                       WatermarkStrategy.forGenerator(
+                               ctx -> GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP
+                       ).withTimestampAssigner(
+                               (r, previousTimestamp) -> r.f1
+                       );
+               SingleOutputStreamOperator<Integer> elements1 = 
env.fromElements(
+                       Tuple2.of(1, 3),
+                       Tuple2.of(1, 1),
+                       Tuple2.of(2, 1),
+                       Tuple2.of(1, 4),
+                       // emit watermark = 5
+                       Tuple2.of(2, 3), // late element
+                       Tuple2.of(1, 2), // late element
+                       Tuple2.of(1, 13),
+                       Tuple2.of(1, 11),
+                       Tuple2.of(2, 14),
+                       // emit watermark = 15
+                       Tuple2.of(1, 11) // late element
+               ).assignTimestampsAndWatermarks(watermarkStrategy).map(element 
-> element.f0);
+
+               SingleOutputStreamOperator<Integer> elements2 = 
env.fromElements(
+                       Tuple2.of(1, 3),
+                       Tuple2.of(1, 1),
+                       Tuple2.of(2, 1),
+                       Tuple2.of(1, 4),
+                       // emit watermark = 5
+                       Tuple2.of(2, 3), // late element
+                       Tuple2.of(1, 2), // late element
+                       Tuple2.of(1, 13),
+                       Tuple2.of(1, 11),
+                       Tuple2.of(2, 14),
+                       // emit watermark = 15
+                       Tuple2.of(1, 11) // late element
+               ).assignTimestampsAndWatermarks(watermarkStrategy).map(element 
-> element.f0);
+
+               OutputTag<Integer> lateElements = new 
OutputTag<>("late_elements", BasicTypeInfo.INT_TYPE_INFO);
+               SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> sums 
= elements1.connect(elements2)
+                       .keyBy(element -> element, element -> element)
+                       .process(new KeyedCoProcessFunction<Integer, Integer, 
Integer, Tuple3<Long, Integer, Integer>>() {
+
+                               private MapState<Long, Integer> countState;
+                               private ValueState<Long> previousTimestampState;
+
+                               @Override
+                               public void open(Configuration parameters) {
+                                       countState = 
getRuntimeContext().getMapState(
+                                               new MapStateDescriptor<>(
+                                                       "sum",
+                                                       
BasicTypeInfo.LONG_TYPE_INFO,
+                                                       
BasicTypeInfo.INT_TYPE_INFO)
+                                       );
+                                       previousTimestampState = 
getRuntimeContext().getState(
+                                               new ValueStateDescriptor<>(
+                                                       "previousTimestamp",
+                                                       
BasicTypeInfo.LONG_TYPE_INFO
+                                               )
+                                       );
+                               }
+
+                               @Override
+                               public void processElement1(
+                                               Integer value,
+                                               Context ctx,
+                                               Collector<Tuple3<Long, Integer, 
Integer>> out) throws Exception {
+                                       processElement(value, ctx);
+                               }
+
+                               @Override
+                               public void processElement2(
+                                               Integer value,
+                                               Context ctx,
+                                               Collector<Tuple3<Long, Integer, 
Integer>> out) throws Exception {
+                                       processElement(value, ctx);
+                               }
+
+                               private void processElement(
+                                               Integer value,
+                                               Context ctx) throws Exception {
+                                       Long elementTimestamp = ctx.timestamp();
+                                       long nextTen = ((elementTimestamp + 10) 
/ 10) * 10;
+                                       
ctx.timerService().registerEventTimeTimer(nextTen);
+                                       if (elementTimestamp < 
ctx.timerService().currentWatermark()) {
+                                               ctx.output(lateElements, value);
+                                       } else {
+                                               Long previousTimestamp = 
Optional.ofNullable(previousTimestampState.value()).orElse(0L);
+                                               assertThat(elementTimestamp, 
greaterThanOrEqualTo(previousTimestamp));
+                                               
previousTimestampState.update(elementTimestamp);
+
+                                               Integer currentCount = 
Optional.ofNullable(countState.get(nextTen)).orElse(0);
+                                               countState.put(nextTen, 
currentCount + 1);
+                                       }
+                               }
+
+                               @Override
+                               public void onTimer(
+                                               long timestamp,
+                                               OnTimerContext ctx,
+                                               Collector<Tuple3<Long, Integer, 
Integer>> out) throws Exception {
+                                       out.collect(Tuple3.of(timestamp, 
ctx.getCurrentKey(), countState.get(timestamp)));
+                                       countState.remove(timestamp);
+
+                                       // this would go in infinite loop if we 
did not quiesce the timer service.
+                                       
ctx.timerService().registerEventTimeTimer(timestamp + 1);
+                               }
+                       });
+
+               DataStream<Integer> lateStream = 
sums.getSideOutput(lateElements);
+               List<Integer> lateRecordsCollected = 
CollectionUtil.iteratorToList(DataStreamUtils.collect(lateStream));
+               List<Tuple3<Long, Integer, Integer>> sumsCollected = 
CollectionUtil.iteratorToList(DataStreamUtils.collect(sums));
+
+               assertTrue(lateRecordsCollected.isEmpty());
+               assertThat(sumsCollected, equalTo(
+                       Arrays.asList(
+                               Tuple3.of(10L, 1, 8),
+                               Tuple3.of(20L, 1, 6),
+                               Tuple3.of(10L, 2, 4),
+                               Tuple3.of(20L, 2, 2)
+                       )
+               ));
        }
 
+       private static final WatermarkGenerator<Tuple2<Integer, Integer>> 
GENERATE_WATERMARK_AFTER_4_14_TIMESTAMP =
+               new WatermarkGenerator<Tuple2<Integer, Integer>>() {
+                       @Override
+                       public void onEvent(
+                                       Tuple2<Integer, Integer> event,
+                                       long eventTimestamp,
+                                       WatermarkOutput output) {
+                               if (eventTimestamp == 4) {
+                                       output.emitWatermark(new Watermark(5));
+                               } else if (eventTimestamp == 14) {
+                                       output.emitWatermark(new Watermark(15));
+                               }
+                       }
+
+                       @Override
+                       public void onPeriodicEmit(WatermarkOutput output) {
+
+                       }
+               };
+
        private static class AssertingOperator extends 
AbstractStreamOperator<Long>
                        implements OneInputStreamOperator<Tuple2<Integer, 
byte[]>, Long>, BoundedOneInput {
                private final Set<Integer> seenKeys = new HashSet<>();
@@ -333,7 +552,7 @@ public class SortingBoundedInputITCase extends 
AbstractTestBase {
                }
 
                @Override
-               public void endInput(int inputId) throws Exception {
+               public void endInput(int inputId) {
                        if (inputId == 1) {
                                input1Finished = true;
                        }
@@ -398,17 +617,17 @@ public class SortingBoundedInputITCase extends 
AbstractTestBase {
                }
 
                @Override
-               public void processWatermark(Watermark mark) throws Exception {
+               public void 
processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
 
                }
 
                @Override
-               public void processLatencyMarker(LatencyMarker latencyMarker) 
throws Exception {
+               public void processLatencyMarker(LatencyMarker latencyMarker) {
 
                }
 
                @Override
-               public void setKeyContextElement(StreamRecord<Tuple2<Integer, 
byte[]>> record) throws Exception {
+               public void setKeyContextElement(StreamRecord<Tuple2<Integer, 
byte[]>> record) {
 
                }
        }

Reply via email to