http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java new file mode 100644 index 0000000..e9c5eeb --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators.co; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction; +import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link CoStreamTimelyFlatMap}. + */ +public class TimelyCoFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + CoStreamTimelyFlatMap<String, Integer, String, String> operator = + new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark1(new Watermark(17)); + testHarness.processWatermark2(new Watermark(17)); + testHarness.processElement1(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark1(new Watermark(42)); + testHarness.processWatermark2(new Watermark(42)); + testHarness.processElement2(new StreamRecord<>("6", 13L)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5WM:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6WM:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + CoStreamTimelyFlatMap<String, Integer, String, String> operator = + new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement1(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement2(new StreamRecord<>("6")); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5PT:17")); + expectedOutput.add(new StreamRecord<>("6PT:42")); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testEventTimeTimers() throws Exception { + + CoStreamTimelyFlatMap<String, Integer, String, String> operator = + new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(17, 42L)); + testHarness.processElement2(new StreamRecord<>("18", 42L)); + + testHarness.processWatermark1(new Watermark(5)); + testHarness.processWatermark2(new Watermark(5)); + + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L)); + expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L)); + expectedOutput.add(new StreamRecord<>("1777", 5L)); + expectedOutput.add(new Watermark(5L)); + expectedOutput.add(new StreamRecord<>("1777", 6L)); + expectedOutput.add(new Watermark(6L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testProcessingTimeTimers() throws Exception { + + CoStreamTimelyFlatMap<String, Integer, String, String> operator = + new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(17)); + testHarness.processElement2(new StreamRecord<>("18")); + + testHarness.setProcessingTime(5); + testHarness.setProcessingTime(6); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17")); + expectedOutput.add(new StreamRecord<>("INPUT2:18")); + expectedOutput.add(new StreamRecord<>("1777", 5L)); + expectedOutput.add(new StreamRecord<>("1777", 6L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testEventTimeTimerWithState() throws Exception { + + CoStreamTimelyFlatMap<String, Integer, String, String> operator = + new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark1(new Watermark(1)); + testHarness.processWatermark2(new Watermark(1)); + testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6 + + testHarness.processWatermark1(new Watermark(2)); + testHarness.processWatermark2(new Watermark(2)); + testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7 + + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + testHarness.processWatermark1(new Watermark(7)); + testHarness.processWatermark2(new Watermark(7)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(1L)); + expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L)); + expectedOutput.add(new Watermark(2L)); + expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L)); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + expectedOutput.add(new Watermark(7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testProcessingTimeTimerWithState() throws Exception { + + CoStreamTimelyFlatMap<String, Integer, String, String> operator = + new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(1); + testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6 + + testHarness.setProcessingTime(2); + testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7 + + testHarness.setProcessingTime(6); + testHarness.setProcessingTime(7); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT1:17")); + expectedOutput.add(new StreamRecord<>("INPUT2:42")); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testSnapshotAndRestore() throws Exception { + + CoStreamTimelyFlatMap<String, Integer, String, String> operator = + new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + + TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness = + new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement1(new StreamRecord<>(5, 12L)); + testHarness.processElement2(new StreamRecord<>("5", 12L)); + + // snapshot and restore from scratch + StreamStateHandle snapshot = testHarness.snapshotLegacy(0, 0); + + testHarness.close(); + + operator = new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + + testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( + operator, + new IntToStringKeySelector<>(), + new IdentityKeySelector<String>(), + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setup(); + testHarness.restore(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(5); + testHarness.processWatermark1(new Watermark(6)); + testHarness.processWatermark2(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); + expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); + expectedOutput.add(new Watermark(6)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + + private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Integer value) throws Exception { + return "" + value; + } + } + + private static class IdentityKeySelector<T> implements KeySelector<T, T> { + private static final long serialVersionUID = 1L; + + @Override + public T getKey(T value) throws Exception { + return value; + } + } + + private static class WatermarkQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + out.collect(value + "WM:" + timerService.currentWatermark()); + } + + @Override + public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + out.collect(value + "WM:" + timerService.currentWatermark()); + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + } + } + + private static class EventTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT1:" + value); + timerService.registerEventTimeTimer(5); + } + + @Override + public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT2:" + value); + timerService.registerEventTimeTimer(6); + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + + assertEquals(TimeDomain.EVENT_TIME, timeDomain); + out.collect("" + 1777); + } + } + + private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<String> state = + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); + + @Override + public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT1:" + value); + getRuntimeContext().getState(state).update("" + value); + timerService.registerEventTimeTimer(timerService.currentWatermark() + 5); + } + + @Override + public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT2:" + value); + getRuntimeContext().getState(state).update(value); + timerService.registerEventTimeTimer(timerService.currentWatermark() + 5); + } + + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + assertEquals(TimeDomain.EVENT_TIME, timeDomain); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class ProcessingTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT1:" + value); + timerService.registerProcessingTimeTimer(5); + } + + @Override + public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT2:" + value); + timerService.registerProcessingTimeTimer(6); + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + + assertEquals(TimeDomain.PROCESSING_TIME, timeDomain); + out.collect("" + 1777); + } + } + + private static class ProcessingTimeQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + out.collect(value + "PT:" + timerService.currentProcessingTime()); + } + + @Override + public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + out.collect(value + "PT:" + timerService.currentProcessingTime()); + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + } + } + + private static class ProcessingTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<String> state = + new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null); + + @Override + public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT1:" + value); + getRuntimeContext().getState(state).update("" + value); + timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5); + } + + @Override + public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT2:" + value); + getRuntimeContext().getState(state).update(value); + timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5); + } + + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + assertEquals(TimeDomain.PROCESSING_TIME, timeDomain); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class BothTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap1(Integer value, TimerService timerService, Collector<String> out) throws Exception { + timerService.registerEventTimeTimer(6); + } + + @Override + public void flatMap2(String value, TimerService timerService, Collector<String> out) throws Exception { + timerService.registerProcessingTimeTimer(5); + } + + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + if (TimeDomain.EVENT_TIME.equals(timeDomain)) { + out.collect("EVENT:1777"); + } else { + out.collect("PROC:1777"); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 3dd2ed7..52311f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -576,12 +576,9 @@ public class OneInputStreamTaskTest extends TestLogger { } @Override - public void processWatermark(Watermark mark) throws Exception { - - } - - @Override public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { + super.snapshotState(out, checkpointId, timestamp); + if (random == null) { random = new Random(seed); } @@ -595,6 +592,8 @@ public class OneInputStreamTaskTest extends TestLogger { @Override public void restoreState(FSDataInputStream in) throws Exception { + super.restoreState(in); + numberRestoreCalls++; if (random == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 95115d6..bc40a89 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -53,6 +53,7 @@ import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TemporaryFolder; import scala.Option; @@ -76,7 +77,11 @@ import static org.junit.Assert.fail; /** * TODO : parameterize to test all different state backends! + * + * Ignored for now since the timers in {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} + * are not repartitionable/key-group-aware. */ +@Ignore public class RescalingITCase extends TestLogger { private static final int numTaskManagers = 2; http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index fc48719..d794953 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -160,6 +160,7 @@ public class SavepointITCase extends TestLogger { config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); + config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0"); config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 5855214..530d6cc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -726,6 +726,8 @@ public class TimestampITCase extends TestLogger { @Override public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + for (Watermark previousMark: watermarks) { assertTrue(previousMark.getTimestamp() < mark.getTimestamp()); } @@ -760,9 +762,6 @@ public class TimestampITCase extends TestLogger { } output.collect(element); } - - @Override - public void processWatermark(Watermark mark) throws Exception {} } public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { @@ -774,10 +773,6 @@ public class TimestampITCase extends TestLogger { } output.collect(element); } - - @Override - public void processWatermark(Watermark mark) throws Exception {} - } public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {
