This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d87808f  [FLINK-26708] TimestampsAndWatermarksOperator should not 
propagate WatermarkStatus
d87808f is described below

commit d87808fe8a2fe6538b902056490395ae8597a48b
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Mar 17 15:25:32 2022 +0100

    [FLINK-26708] TimestampsAndWatermarksOperator should not propagate 
WatermarkStatus
---
 .../runtime/operators/TimestampsAndWatermarksOperator.java |  4 ++++
 .../operators/TimestampsAndWatermarksOperatorTest.java     | 14 ++++++++++++++
 .../streaming/util/OneInputStreamOperatorTestHarness.java  | 11 +++++++++++
 3 files changed, 29 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index a10c92e..570cb6b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -126,6 +126,10 @@ public class TimestampsAndWatermarksOperator<T> extends 
AbstractStreamOperator<T
         }
     }
 
+    /** Override the base implementation to completely ignore statuses 
propagated from upstream. */
+    @Override
+    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {}
+
     @Override
     public void finish() throws Exception {
         super.finish();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
index 555a1a6..b7f3461 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.junit.Test;
@@ -57,6 +58,19 @@ public class TimestampsAndWatermarksOperatorTest {
     }
 
     @Test
+    public void inputStatusesAreNotForwarded() throws Exception {
+        OneInputStreamOperatorTestHarness<Long, Long> testHarness =
+                createTestHarness(
+                        WatermarkStrategy.forGenerator((ctx) -> new 
PeriodicWatermarkGenerator())
+                                .withTimestampAssigner((ctx) -> new 
LongExtractor()));
+
+        testHarness.processWatermarkStatus(WatermarkStatus.IDLE);
+        testHarness.setProcessingTime(AUTO_WATERMARK_INTERVAL);
+
+        assertThat(testHarness.getOutput(), empty());
+    }
+
+    @Test
     public void longMaxInputWatermarkIsForwarded() throws Exception {
         OneInputStreamOperatorTestHarness<Long, Long> testHarness =
                 createTestHarness(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index a078a08..a65667d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -225,6 +226,16 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         processWatermark(new Watermark(watermark));
     }
 
+    public void processWatermarkStatus(WatermarkStatus status) throws 
Exception {
+        if (inputs.isEmpty()) {
+            getOneInputOperator().processWatermarkStatus(status);
+        } else {
+            checkState(inputs.size() == 1);
+            Input input = inputs.get(0);
+            input.processWatermarkStatus(status);
+        }
+    }
+
     public void processWatermark(Watermark mark) throws Exception {
         currentWatermark = mark.getTimestamp();
         if (inputs.isEmpty()) {

Reply via email to