This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e8f2ab26ebefda0d29cc7fd9ee0a9b5e28afeff0
Author: Xu Huang <huangxu.wal...@gmail.com>
AuthorDate: Tue Jan 14 14:16:22 2025 +0800

    [FLINK-37112][runtime] Process event time extension related watermarks in 
operator for DataStream V2
---
 .../datastream/impl/operators/ProcessOperator.java |  16 +-
 .../TwoInputBroadcastProcessOperator.java          |  24 +-
 .../TwoInputNonBroadcastProcessOperator.java       |  24 +-
 .../impl/operators/TwoOutputProcessOperator.java   |  16 +-
 .../runtime/io/AbstractStreamTaskNetworkInput.java |   7 +
 .../eventtime/EventTimeWatermarkCombiner.java      | 117 ++++++++
 .../eventtime/EventTimeWatermarkHandler.java       | 220 ++++++++++++++
 .../streaming/util/watermark/WatermarkUtils.java   |  26 ++
 .../eventtime/EventTimeWatermarkCombinerTest.java  | 276 ++++++++++++++++++
 .../eventtime/EventTimeWatermarkHandlerTest.java   | 317 +++++++++++++++++++++
 10 files changed, 1037 insertions(+), 6 deletions(-)

diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index cde50f26a74..1782e043414 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
+import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
 import 
org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
@@ -38,6 +39,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
 
 import java.util.Map;
 import java.util.function.BiConsumer;
@@ -60,6 +62,9 @@ public class ProcessOperator<IN, OUT>
     protected transient Map<String, AbstractInternalWatermarkDeclaration<?>>
             watermarkDeclarationMap;
 
+    // {@link EventTimeWatermarkHandler} will be used to process event time 
related watermarks
+    protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;
+
     public ProcessOperator(OneInputStreamProcessFunction<IN, OUT> 
userFunction) {
         super(userFunction);
     }
@@ -99,6 +104,8 @@ public class ProcessOperator<IN, OUT>
         outputCollector = getOutputCollector();
         nonPartitionedContext = getNonPartitionedContext();
         partitionedContext.setNonPartitionedContext(nonPartitionedContext);
+        this.eventTimeWatermarkHandler =
+                new EventTimeWatermarkHandler(1, output, timeServiceManager);
 
         // Initialize event time extension related ProcessFunction
         if (userFunction instanceof ExtractEventTimeProcessFunction) {
@@ -128,7 +135,14 @@ public class ProcessOperator<IN, OUT>
                                 .get(watermark.getWatermark().getIdentifier())
                                 .getDefaultHandlingStrategy()
                         == WatermarkHandlingStrategy.FORWARD) {
-            output.emitWatermark(watermark);
+
+            if 
(EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark()))
 {
+                // if the watermark is event time related watermark, process 
them to advance event
+                // time
+                
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
+            } else {
+                output.emitWatermark(watermark);
+            }
         }
     }
 
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
index 8f3e38d14a2..926c133f0fe 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
+import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
@@ -37,6 +38,7 @@ import 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
 
 import java.util.Map;
 import java.util.function.BiConsumer;
@@ -62,6 +64,9 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
     protected transient Map<String, AbstractInternalWatermarkDeclaration<?>>
             watermarkDeclarationMap;
 
+    // {@link EventTimeWatermarkHandler} will be used to process event time 
related watermarks
+    protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;
+
     public TwoInputBroadcastProcessOperator(
             TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> 
userFunction) {
         super(userFunction);
@@ -100,6 +105,9 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
                         getOperatorStateBackend());
         this.nonPartitionedContext = getNonPartitionedContext();
         
this.partitionedContext.setNonPartitionedContext(this.nonPartitionedContext);
+        this.eventTimeWatermarkHandler =
+                new EventTimeWatermarkHandler(2, output, timeServiceManager);
+
         this.userFunction.open(this.nonPartitionedContext);
     }
 
@@ -126,7 +134,13 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, 
OUT>
                                 .get(watermark.getWatermark().getIdentifier())
                                 .getDefaultHandlingStrategy()
                         == WatermarkHandlingStrategy.FORWARD) {
-            output.emitWatermark(watermark);
+            if 
(EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark()))
 {
+                // if the watermark is event time related watermark, process 
them to advance event
+                // time
+                
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
+            } else {
+                output.emitWatermark(watermark);
+            }
         }
     }
 
@@ -140,7 +154,13 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, 
OUT>
                                 .get(watermark.getWatermark().getIdentifier())
                                 .getDefaultHandlingStrategy()
                         == WatermarkHandlingStrategy.FORWARD) {
-            output.emitWatermark(watermark);
+            if 
(EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark()))
 {
+                // if the watermark is event time related watermark, process 
them to advance event
+                // time
+                
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 1);
+            } else {
+                output.emitWatermark(watermark);
+            }
         }
     }
 
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
index fcf6d8698be..3d699dbbf2d 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
+import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -38,6 +39,7 @@ import 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
 
 import java.util.Map;
 import java.util.function.BiConsumer;
@@ -63,6 +65,9 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
     protected transient Map<String, AbstractInternalWatermarkDeclaration<?>>
             watermarkDeclarationMap;
 
+    // {@link EventTimeWatermarkHandler} will be used to process event time 
related watermarks
+    protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;
+
     public TwoInputNonBroadcastProcessOperator(
             TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> 
userFunction) {
         super(userFunction);
@@ -103,6 +108,9 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
                         operatorStateBackend);
         this.nonPartitionedContext = getNonPartitionedContext();
         
this.partitionedContext.setNonPartitionedContext(this.nonPartitionedContext);
+        this.eventTimeWatermarkHandler =
+                new EventTimeWatermarkHandler(2, output, timeServiceManager);
+
         this.userFunction.open(this.nonPartitionedContext);
     }
 
@@ -129,7 +137,13 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
                                 .get(watermark.getWatermark().getIdentifier())
                                 .getDefaultHandlingStrategy()
                         == WatermarkHandlingStrategy.FORWARD) {
-            output.emitWatermark(watermark);
+            if 
(EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark()))
 {
+                // if the watermark is event time related watermark, process 
them to advance event
+                // time
+                
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
+            } else {
+                output.emitWatermark(watermark);
+            }
         }
     }
 
@@ -143,7 +157,13 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
                                 .get(watermark.getWatermark().getIdentifier())
                                 .getDefaultHandlingStrategy()
                         == WatermarkHandlingStrategy.FORWARD) {
-            output.emitWatermark(watermark);
+            if 
(EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark()))
 {
+                // if the watermark is event time related watermark, process 
them to advance event
+                // time
+                
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 1);
+            } else {
+                output.emitWatermark(watermark);
+            }
         }
     }
 
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
index aefbc55a1f6..91d0c0f074f 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext;
 import 
org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
+import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -39,6 +40,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
 import org.apache.flink.util.OutputTag;
 
 import java.util.Map;
@@ -70,6 +72,9 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE>
     protected transient Map<String, AbstractInternalWatermarkDeclaration<?>>
             watermarkDeclarationMap;
 
+    // {@link EventTimeWatermarkHandler} will be used to process event time 
related watermarks
+    protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;
+
     public TwoOutputProcessOperator(
             TwoOutputStreamProcessFunction<IN, OUT_MAIN, OUT_SIDE> 
userFunction,
             OutputTag<OUT_SIDE> outputTag) {
@@ -112,6 +117,9 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, 
OUT_SIDE>
                         operatorStateStore);
         this.nonPartitionedContext = getNonPartitionedContext();
         
this.partitionedContext.setNonPartitionedContext(nonPartitionedContext);
+        this.eventTimeWatermarkHandler =
+                new EventTimeWatermarkHandler(1, output, timeServiceManager);
+
         this.userFunction.open(this.nonPartitionedContext);
     }
 
@@ -136,7 +144,13 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, 
OUT_SIDE>
                                 .get(watermark.getWatermark().getIdentifier())
                                 .getDefaultHandlingStrategy()
                         == WatermarkHandlingStrategy.FORWARD) {
-            output.emitWatermark(watermark);
+            if 
(EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark.getWatermark()))
 {
+                // if the watermark is event time related watermark, process 
them to advance event
+                // time
+                
eventTimeWatermarkHandler.processWatermark(watermark.getWatermark(), 0);
+            } else {
+                output.emitWatermark(watermark);
+            }
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
index cb6675d81ba..c076f5bf433 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.runtime.tasks.StreamTask.CanEmitBatchOfRecords
 import 
org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
 import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
 import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.util.watermark.WatermarkUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
@@ -118,8 +119,14 @@ public abstract class AbstractStreamTaskNetworkInput<
         this.recordAttributesCombiner =
                 new 
RecordAttributesCombiner(checkpointedInputGate.getNumberOfInputChannels());
 
+        WatermarkUtils.addEventTimeWatermarkCombinerIfNeeded(
+                watermarkDeclarationSet, watermarkCombiners, 
flattenedChannelIndices.size());
         for (AbstractInternalWatermarkDeclaration<?> watermarkDeclaration :
                 watermarkDeclarationSet) {
+            if 
(watermarkCombiners.containsKey(watermarkDeclaration.getIdentifier())) {
+                continue;
+            }
+
             watermarkCombiners.put(
                     watermarkDeclaration.getIdentifier(),
                     watermarkDeclaration.createWatermarkCombiner(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkCombiner.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkCombiner.java
new file mode 100644
index 00000000000..01731707293
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkCombiner.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.watermark.extension.eventtime;
+
+import org.apache.flink.api.common.watermark.BoolWatermark;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import org.apache.flink.runtime.event.WatermarkEvent;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
+import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.util.function.Consumer;
+
+/**
+ * A {@link WatermarkCombiner} used to combine {@link EventTimeExtension} 
related watermarks in
+ * input channels.
+ */
+public class EventTimeWatermarkCombiner extends StatusWatermarkValve 
implements WatermarkCombiner {
+
+    private WrappedDataOutput<?> output;
+
+    public EventTimeWatermarkCombiner(int numInputChannels) {
+        super(numInputChannels);
+        this.output = new WrappedDataOutput<>();
+    }
+
+    @Override
+    public void combineWatermark(
+            Watermark watermark, int channelIndex, Consumer<Watermark> 
watermarkEmitter)
+            throws Exception {
+        output.setWatermarkEmitter(watermarkEmitter);
+
+        if (EventTimeExtension.isEventTimeWatermark(watermark)) {
+            inputWatermark(
+                    new org.apache.flink.streaming.api.watermark.Watermark(
+                            ((LongWatermark) watermark).getValue()),
+                    channelIndex,
+                    output);
+        } else if 
(EventTimeExtension.isIdleStatusWatermark(watermark.getIdentifier())) {
+            inputWatermarkStatus(
+                    new WatermarkStatus(
+                            ((BoolWatermark) watermark).getValue()
+                                    ? WatermarkStatus.IDLE_STATUS
+                                    : WatermarkStatus.ACTIVE_STATUS),
+                    channelIndex,
+                    output);
+        }
+    }
+
+    /** Wrap {@link DataOutput} to emit watermarks using {@code 
watermarkEmitter}. */
+    static class WrappedDataOutput<T> implements DataOutput<T> {
+
+        private Consumer<Watermark> watermarkEmitter;
+
+        public WrappedDataOutput() {}
+
+        public void setWatermarkEmitter(Consumer<Watermark> watermarkEmitter) {
+            this.watermarkEmitter = watermarkEmitter;
+        }
+
+        @Override
+        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
+            throw new RuntimeException("Should not emit records with this 
output.");
+        }
+
+        @Override
+        public void 
emitWatermark(org.apache.flink.streaming.api.watermark.Watermark watermark)
+                throws Exception {
+            watermarkEmitter.accept(
+                    
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(
+                            watermark.getTimestamp()));
+        }
+
+        @Override
+        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) 
throws Exception {
+            watermarkEmitter.accept(
+                    
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(
+                            watermarkStatus.isIdle()));
+        }
+
+        @Override
+        public void emitLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
+            throw new RuntimeException("Should not emit LatencyMarker with 
this output.");
+        }
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) 
throws Exception {
+            throw new RuntimeException("Should not emit RecordAttributes with 
this output.");
+        }
+
+        @Override
+        public void emitWatermark(WatermarkEvent watermark) throws Exception {
+            throw new RuntimeException("Should not emit WatermarkEvent with 
this output.");
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler.java
new file mode 100644
index 00000000000..4a3a4b5c5c0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/watermark/extension/eventtime/EventTimeWatermarkHandler.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.watermark.extension.eventtime;
+
+import org.apache.flink.api.common.watermark.BoolWatermark;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import org.apache.flink.runtime.event.WatermarkEvent;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class is used to handle {@link EventTimeExtension} related watermarks 
in operator, such as
+ * {@link EventTimeExtension#EVENT_TIME_WATERMARK_DECLARATION} and {@link
+ * EventTimeExtension#IDLE_STATUS_WATERMARK_DECLARATION}. It will emit event 
time watermark and idle
+ * status to downstream operators according to received watermarks.
+ */
+public class EventTimeWatermarkHandler {
+
+    /** number of input of operator, it should between 1 and 2 in current 
design. */
+    private final int numOfInput;
+
+    private final Output<?> output;
+
+    private final List<EventTimeWithIdleStatus> eventTimePerInput;
+
+    /**
+     * time service manager is used to advance event time in operator, and it 
may be null if the
+     * operator is not keyed.
+     */
+    @Nullable private final InternalTimeServiceManager<?> timeServiceManager;
+
+    private long lastEmitWatermark = Long.MIN_VALUE;
+
+    private boolean lastEmitIdleStatus = false;
+
+    /** A bitset to record whether the watermark has been received from each 
input. */
+    private final BitSet hasReceiveWatermarks;
+
+    public EventTimeWatermarkHandler(
+            int numOfInput,
+            Output<?> output,
+            @Nullable InternalTimeServiceManager<?> timeServiceManager) {
+        checkArgument(numOfInput >= 1 && numOfInput <= 2, "numOfInput should 
between 1 and 2");
+        this.numOfInput = numOfInput;
+        this.output = output;
+        this.eventTimePerInput = new ArrayList<>(numOfInput);
+        for (int i = 0; i < numOfInput; i++) {
+            eventTimePerInput.add(new EventTimeWithIdleStatus());
+        }
+        this.timeServiceManager = timeServiceManager;
+        this.hasReceiveWatermarks = new BitSet(numOfInput);
+    }
+
+    private EventTimeUpdateStatus processEventTime(long timestamp, int 
inputIndex)
+            throws Exception {
+        checkState(inputIndex < numOfInput);
+        hasReceiveWatermarks.set(inputIndex);
+        eventTimePerInput.get(inputIndex).setEventTime(timestamp);
+        eventTimePerInput.get(inputIndex).setIdleStatus(false);
+
+        return tryAdvanceEventTimeAndEmitWatermark();
+    }
+
+    private EventTimeUpdateStatus tryAdvanceEventTimeAndEmitWatermark() throws 
Exception {
+        // if current event time is larger than last emit watermark, emit it
+        long currentEventTime = getCurrentEventTime();
+        if (currentEventTime > lastEmitWatermark
+                && hasReceiveWatermarks.cardinality() == numOfInput) {
+            output.emitWatermark(
+                    new WatermarkEvent(
+                            
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(
+                                    currentEventTime),
+                            false));
+            lastEmitWatermark = currentEventTime;
+            if (timeServiceManager != null) {
+                timeServiceManager.advanceWatermark(new 
Watermark(currentEventTime));
+            }
+            return EventTimeUpdateStatus.ofUpdatedWatermark(lastEmitWatermark);
+        }
+        return EventTimeUpdateStatus.NO_UPDATE;
+    }
+
+    private void processEventTimeIdleStatus(boolean isIdle, int inputIndex) {
+        checkState(inputIndex < numOfInput);
+        hasReceiveWatermarks.set(inputIndex);
+        eventTimePerInput.get(inputIndex).setIdleStatus(isIdle);
+        tryEmitEventTimeIdleStatus();
+    }
+
+    private void tryEmitEventTimeIdleStatus() {
+        // emit idle status if current idle status is different from last emit
+        boolean inputIdle = isAllInputIdle();
+        if (inputIdle != lastEmitIdleStatus) {
+            output.emitWatermark(
+                    new WatermarkEvent(
+                            
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(
+                                    inputIdle),
+                            false));
+            lastEmitIdleStatus = inputIdle;
+        }
+    }
+
+    private long getCurrentEventTime() {
+        long currentEventTime = Long.MAX_VALUE;
+        for (EventTimeWithIdleStatus eventTimeWithIdleStatus : 
eventTimePerInput) {
+            if (!eventTimeWithIdleStatus.isIdle()) {
+                currentEventTime =
+                        Math.min(currentEventTime, 
eventTimeWithIdleStatus.getEventTime());
+            }
+        }
+        return currentEventTime;
+    }
+
+    private boolean isAllInputIdle() {
+        boolean allInputIsIdle = true;
+        for (EventTimeWithIdleStatus eventTimeWithIdleStatus : 
eventTimePerInput) {
+            allInputIsIdle &= eventTimeWithIdleStatus.isIdle();
+        }
+        return allInputIsIdle;
+    }
+
+    public long getLastEmitWatermark() {
+        return lastEmitWatermark;
+    }
+
+    /**
+     * Process EventTimeWatermark/IdleStatusWatermark.
+     *
+     * <p>It's caller's responsibility to check whether the watermark is
+     * EventTimeWatermark/IdleStatusWatermark.
+     *
+     * @return the status of event time watermark update.
+     */
+    public EventTimeUpdateStatus processWatermark(
+            org.apache.flink.api.common.watermark.Watermark watermark, int 
inputIndex)
+            throws Exception {
+        if 
(EventTimeExtension.isEventTimeWatermark(watermark.getIdentifier())) {
+            long timestamp = ((LongWatermark) watermark).getValue();
+            return this.processEventTime(timestamp, inputIndex);
+        } else if 
(EventTimeExtension.isIdleStatusWatermark(watermark.getIdentifier())) {
+            boolean isIdle = ((BoolWatermark) watermark).getValue();
+            this.processEventTimeIdleStatus(isIdle, inputIndex);
+        }
+        return EventTimeUpdateStatus.NO_UPDATE;
+    }
+
+    /** This class represents event-time updated status. */
+    public static class EventTimeUpdateStatus {
+
+        public static final EventTimeUpdateStatus NO_UPDATE = new 
EventTimeUpdateStatus(false, -1L);
+
+        private final boolean isEventTimeUpdated;
+
+        private final long newEventTime;
+
+        private EventTimeUpdateStatus(boolean isEventTimeUpdated, long 
newEventTime) {
+            this.isEventTimeUpdated = isEventTimeUpdated;
+            this.newEventTime = newEventTime;
+        }
+
+        public boolean isEventTimeUpdated() {
+            return isEventTimeUpdated;
+        }
+
+        public long getNewEventTime() {
+            return newEventTime;
+        }
+
+        public static EventTimeUpdateStatus ofUpdatedWatermark(long 
newEventTime) {
+            return new EventTimeUpdateStatus(true, newEventTime);
+        }
+    }
+
+    static class EventTimeWithIdleStatus {
+        private long eventTime = Long.MIN_VALUE;
+        private boolean isIdle = false;
+
+        public long getEventTime() {
+            return eventTime;
+        }
+
+        public void setEventTime(long eventTime) {
+            this.eventTime = Math.max(this.eventTime, eventTime);
+        }
+
+        public boolean isIdle() {
+            return isIdle;
+        }
+
+        public void setIdleStatus(boolean idle) {
+            isIdle = idle;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java
index 38f8602d8ba..8954578ff4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/util/watermark/WatermarkUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.util.watermark;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.watermark.Watermark;
 import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
 import org.apache.flink.datastream.api.function.ProcessFunction;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -29,9 +30,12 @@ import 
org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import 
org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
+import org.apache.flink.streaming.runtime.watermark.WatermarkCombiner;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkCombiner;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -95,4 +99,26 @@ public final class WatermarkUtils {
                 .map(AbstractInternalWatermarkDeclaration::from)
                 .collect(Collectors.toSet());
     }
+
+    /** Create watermark combiners if there are event time watermark 
declarations. */
+    public static void addEventTimeWatermarkCombinerIfNeeded(
+            Set<AbstractInternalWatermarkDeclaration<?>> 
watermarkDeclarationSet,
+            Map<String, WatermarkCombiner> watermarkCombiners,
+            int numberOfInputChannels) {
+        if (watermarkDeclarationSet.stream()
+                .anyMatch(
+                        declaration ->
+                                EventTimeExtension.isEventTimeWatermark(
+                                        declaration.getIdentifier()))) {
+            // create event time watermark combiner
+            EventTimeWatermarkCombiner eventTimeWatermarkCombiner =
+                    new EventTimeWatermarkCombiner(numberOfInputChannels);
+            watermarkCombiners.put(
+                    
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.getIdentifier(),
+                    eventTimeWatermarkCombiner);
+            watermarkCombiners.put(
+                    
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.getIdentifier(),
+                    eventTimeWatermarkCombiner);
+        }
+    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkCombinerTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkCombinerTest.java
new file mode 100644
index 00000000000..32f634dd8a3
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkCombinerTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream.extension.eventtime;
+
+import org.apache.flink.api.common.watermark.BoolWatermark;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkCombiner;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link EventTimeWatermarkCombiner}. */
+class EventTimeWatermarkCombinerTest {
+
+    private final List<Watermark> outputWatermarks = new ArrayList<>();
+    private EventTimeWatermarkCombiner combiner;
+
+    @BeforeEach
+    void before() {
+        combiner = new EventTimeWatermarkCombiner(2);
+    }
+
+    @AfterEach
+    void after() {
+        outputWatermarks.clear();
+        combiner = null;
+    }
+
+    @Test
+    void testCombinedResultIsMin() throws Exception {
+        // The test scenario is as follows:
+        // 
-----------------------------------------------------------------------------
+        //               test scenario     |         expected result
+        // 
-----------------------------------------------------------------------------
+        //    Step | Channel 0 | Channel 1 | output event time | output idle 
status
+        // 
-----------------------------------------------------------------------------
+        //     1   |     1     |     2     |         [1]       |    []
+        //     2   |     3     |           |       [1, 2]      |    []
+        // 
-----------------------------------------------------------------------------
+        // e.g. The step 1 means that Channel 0 will receive the event time 
watermark with value 1,
+        // and the Channel 1 will receive the event time watermark with value 
2.
+        // After step 1 has been executed, the combiner should output an event 
time watermark with
+        // value 1. And Step2 means that Channel 0 will receive the event time 
watermark with value
+        // 3, After step 1 has been executed, the combiner should output an 
event time watermark
+        // with value 2 again.
+
+        // Step 1
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1),
+                0,
+                outputWatermarks::add);
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2),
+                1,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L);
+
+        // Step 2
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(3),
+                0,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+
+        checkOutputIdleStatusWatermarkValues();
+    }
+
+    @Test
+    void testCombineWhenPartialChannelsIdle() throws Exception {
+        // The test scenario is as follows:
+        // 
-----------------------------------------------------------------------------
+        //               test scenario     |         expected result
+        // 
-----------------------------------------------------------------------------
+        //    Step | Channel 0 | Channel 1 | output event time | output idle 
status
+        // 
-----------------------------------------------------------------------------
+        //     1   |     1     |           |         []        |    []
+        //     2   |           |   true    |         [1]       |    []
+        //     3   |     2     |           |       [1,2]       |    []
+        //     4   |           |   false   |       [1,2]       |    []
+        //     5   |           |     3     |       [1,2]       |    []
+        //     6   |     4     |           |      [1,2,3]      |    []
+        // 
-----------------------------------------------------------------------------
+        // e.g. The step 1 means that Channel 0 will receive the event time 
watermark with value 1.
+        // After step 1 has been executed, the combiner should not output any 
event time watermark
+        // as the combiner has not received event time watermark from all 
input channels.
+        // The step 2 means that Channel 1 will receive the idle status 
watermark with value true.
+        // After step 2 has been executed, the combiner should output an event 
time watermark with
+        // value 1.
+
+        // Step 1
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1),
+                0,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues();
+
+        // Step 2
+        combiner.combineWatermark(
+                
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true),
+                1,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L);
+
+        // Step 3
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2),
+                0,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+
+        // Step 4
+        combiner.combineWatermark(
+                
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false),
+                1,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+
+        // Step 5
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(3),
+                1,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+
+        // Step 6
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(4),
+                0,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L, 2L, 3L);
+
+        checkOutputIdleStatusWatermarkValues();
+    }
+
+    @Test
+    void testCombineWhenAllChannelsIdle() throws Exception {
+        // The test scenario is as follows:
+        // 
-----------------------------------------------------------------------------
+        //               test scenario     |         expected result
+        // 
-----------------------------------------------------------------------------
+        //    Step | Channel 0 | Channel 1 | output event time | output idle 
status
+        // 
-----------------------------------------------------------------------------
+        //     1   |     1     |     2     |         [1]       |    []
+        //     2   |    true   |           |        [1,2]      |    []
+        //     3   |           |   true    |        [1,2]      |    [true]
+        //     4   |   false   |           |        [1,2]      |    [true, 
false]
+        //     5   |     3     |           |       [1,2,3]     |    [true, 
false]
+        //     6   |           |   false   |       [1,2,3]     |    [true, 
false]
+        // 
-----------------------------------------------------------------------------
+
+        // Step 1
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1),
+                0,
+                outputWatermarks::add);
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2),
+                1,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L);
+
+        // Step 2
+        combiner.combineWatermark(
+                
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true),
+                0,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+        checkOutputIdleStatusWatermarkValues();
+
+        // Step 3
+        combiner.combineWatermark(
+                
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true),
+                1,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+        checkOutputIdleStatusWatermarkValues(true);
+
+        // Step 4
+        combiner.combineWatermark(
+                
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false),
+                0,
+                outputWatermarks::add);
+        checkOutputIdleStatusWatermarkValues(true, false);
+
+        // Step 5
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(3),
+                0,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(1L, 2L, 3L);
+        checkOutputIdleStatusWatermarkValues(true, false);
+
+        // Step 6
+        combiner.combineWatermark(
+                
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false),
+                1,
+                outputWatermarks::add);
+        checkOutputIdleStatusWatermarkValues(true, false);
+    }
+
+    @Test
+    void testCombineWaitForAllChannels() throws Exception {
+        // The test scenario is as follows:
+        // 
-----------------------------------------------------------------------------
+        //               test scenario     |         expected result
+        // 
-----------------------------------------------------------------------------
+        //    Step | Channel 0 | Channel 1 | output event time | output idle 
status
+        // 
-----------------------------------------------------------------------------
+        //     1   |     1     |           |         []        |    []
+        //     2   |     3     |           |         []        |    []
+        //     3   |           |     2     |         [2]       |    []
+        // 
-----------------------------------------------------------------------------
+
+        // Step 1
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1),
+                0,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues();
+
+        // Step 2
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(3),
+                0,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues();
+
+        // Step 3
+        combiner.combineWatermark(
+                
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2),
+                1,
+                outputWatermarks::add);
+        checkOutputEventTimeWatermarkValues(2L);
+
+        checkOutputIdleStatusWatermarkValues();
+    }
+
+    private void checkOutputEventTimeWatermarkValues(Long... 
expectedReceivedWatermarkValues) {
+        assertThat(
+                        outputWatermarks.stream()
+                                .filter(w -> w instanceof LongWatermark)
+                                .map(w -> ((LongWatermark) w).getValue()))
+                .containsExactly(expectedReceivedWatermarkValues);
+    }
+
+    private void checkOutputIdleStatusWatermarkValues(Boolean... 
expectedReceivedWatermarkValues) {
+        assertThat(
+                        outputWatermarks.stream()
+                                .filter(w -> w instanceof BoolWatermark)
+                                .map(w -> ((BoolWatermark) w).getValue()))
+                .containsExactly(expectedReceivedWatermarkValues);
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkHandlerTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkHandlerTest.java
new file mode 100644
index 00000000000..2ab186cbd2e
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeWatermarkHandlerTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream.extension.eventtime;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.watermark.BoolWatermark;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.event.WatermarkEvent;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link EventTimeWatermarkHandler}. */
+class EventTimeWatermarkHandlerTest {
+
+    private static final List<Watermark> outputWatermarks = new ArrayList<>();
+    private static final List<Long> advancedEventTimes = new ArrayList<>();
+
+    @AfterEach
+    void after() {
+        outputWatermarks.clear();
+        advancedEventTimes.clear();
+    }
+
+    @Test
+    void testOneInputWatermarkHandler() throws Exception {
+        // The test scenario is as follows:
+        // 
-----------------------------------------------------------------------------
+        // test scenario|                   expected result
+        // 
-----------------------------------------------------------------------------
+        //  Step|Input 0|updateStatus|eventTimes| idleStatus 
|advancedEventTimes
+        // 
-----------------------------------------------------------------------------
+        //   1  |   1   |   true,1   |   [1]    |   []       |   [1]
+        //   2  |   2   |   true,2   |   [1,2]  |   []       |   [1,2]
+        //   3  |   1   |  false,-1  |   [1,2]  |   []       |   [1,2]
+        //   4  |  true |  false,-1  |   [1,2]  |  [true]    |   [1,2]
+        //   5  | false |  false,-1  |   [1,2]  |[true,false]|   [1,2]
+        // 
-----------------------------------------------------------------------------
+        // For example, Step 1 indicates that Input 0 will receive an event 
time watermark with a
+        // value of 1.
+        // After Step 1 is executed, the `updateStatus.isEventTimeUpdated` 
returned by the handler
+        // should be true,
+        // and `updateStatus.getNewEventTime` should be equal to 1.
+        // Additionally, the handler should output an event time watermark 
with a value of 1 and
+        // advance the current event time to 2.
+
+        EventTimeWatermarkHandler watermarkHandler =
+                new EventTimeWatermarkHandler(
+                        1, new TestOutput(), new 
TestInternalTimeServiceManager());
+        EventTimeWatermarkHandler.EventTimeUpdateStatus updateStatus;
+
+        // Step 1
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1L), 0);
+        assertThat(updateStatus.isEventTimeUpdated()).isTrue();
+        assertThat(updateStatus.getNewEventTime()).isEqualTo(1L);
+        checkOutputEventTimeWatermarkValues(1L);
+        checkOutputIdleStatusWatermarkValues();
+        checkAdvancedEventTimes(1L);
+
+        // Step 2
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2L), 0);
+        assertThat(updateStatus.isEventTimeUpdated()).isTrue();
+        assertThat(updateStatus.getNewEventTime()).isEqualTo(2L);
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+        checkOutputIdleStatusWatermarkValues();
+        checkAdvancedEventTimes(1L, 2L);
+
+        // Step 3
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1L), 0);
+        assertThat(updateStatus.isEventTimeUpdated()).isFalse();
+        assertThat(updateStatus.getNewEventTime()).isEqualTo(-1L);
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+        checkOutputIdleStatusWatermarkValues();
+        checkAdvancedEventTimes(1L, 2L);
+
+        // Step 4
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), 0);
+        assertThat(updateStatus.isEventTimeUpdated()).isFalse();
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+        checkOutputIdleStatusWatermarkValues(true);
+        checkAdvancedEventTimes(1L, 2L);
+
+        // Step 5
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false),
+                        0);
+        assertThat(updateStatus.isEventTimeUpdated()).isFalse();
+        checkOutputEventTimeWatermarkValues(1L, 2L);
+        checkOutputIdleStatusWatermarkValues(true, false);
+        checkAdvancedEventTimes(1L, 2L);
+    }
+
+    @Test
+    void testTwoInputWatermarkHandler() throws Exception {
+        // The test scenario is as follows:
+        // 
---------------------------------------------------------------------------------
+        //     test scenario        |                   expected result
+        // 
---------------------------------------------------------------------------------
+        //  Step| Input 0 | Input 1 |updateStatus|eventTimes| idleStatus 
|advancedEventTimes
+        // 
---------------------------------------------------------------------------------
+        //   1  |   1     |         |  false,-1  |   []     |   []       |   []
+        //   2  |         |    2    |  true,1    |  [1]     |   []       |   
[1]
+        //   3  |  true   |         |  false,-1  |  [1]     |   []       |   
[1]
+        //   4  |         |  true   |  false,-1  |  [1]     |   [true]   |   
[1]
+        //   5  |         |  false  |  false,-1  |  [1]     |[true,false]|   
[1]
+        // 
-----------------------------------------------------------------------------
+        // For example, Step 1 indicates that Input 0 will receive an event 
time watermark with a
+        // value of 1.
+        // After Step 1 is executed, the `updateStatus.isEventTimeUpdated` 
returned by the handler
+        // should be false,
+        // and `updateStatus.getNewEventTime` should be equal to -1.
+        // Additionally, the handler should not output any event time 
watermark and idle status
+        // watermark.
+
+        EventTimeWatermarkHandler watermarkHandler =
+                new EventTimeWatermarkHandler(
+                        2, new TestOutput(), new 
TestInternalTimeServiceManager());
+        EventTimeWatermarkHandler.EventTimeUpdateStatus updateStatus;
+
+        // Step 1
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(1L), 0);
+        assertThat(updateStatus.isEventTimeUpdated()).isFalse();
+        checkOutputEventTimeWatermarkValues();
+        checkOutputIdleStatusWatermarkValues();
+        checkAdvancedEventTimes();
+
+        // Step 2
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(2L), 1);
+        assertThat(updateStatus.isEventTimeUpdated()).isTrue();
+        assertThat(updateStatus.getNewEventTime()).isEqualTo(1L);
+        checkOutputEventTimeWatermarkValues(1L);
+        checkOutputIdleStatusWatermarkValues();
+        checkAdvancedEventTimes(1L);
+
+        // Step 3
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), 0);
+        assertThat(updateStatus.isEventTimeUpdated()).isFalse();
+        assertThat(updateStatus.getNewEventTime()).isEqualTo(-1L);
+        checkOutputEventTimeWatermarkValues(1L);
+        checkOutputIdleStatusWatermarkValues();
+        checkAdvancedEventTimes(1L);
+
+        // Step 4
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true), 1);
+        assertThat(updateStatus.isEventTimeUpdated()).isFalse();
+        assertThat(updateStatus.getNewEventTime()).isEqualTo(-1L);
+        checkOutputEventTimeWatermarkValues(1L);
+        checkOutputIdleStatusWatermarkValues(true);
+        checkAdvancedEventTimes(1L);
+
+        // Step 5
+        updateStatus =
+                watermarkHandler.processWatermark(
+                        
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false),
+                        1);
+        assertThat(updateStatus.isEventTimeUpdated()).isFalse();
+        assertThat(updateStatus.getNewEventTime()).isEqualTo(-1L);
+        checkOutputEventTimeWatermarkValues(1L);
+        checkOutputIdleStatusWatermarkValues(true, false);
+        checkAdvancedEventTimes(1L);
+    }
+
+    private static class TestOutput implements Output<Long> {
+        @Override
+        public void 
emitWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void emitLatencyMarker(LatencyMarker latencyMarker) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void emitWatermark(WatermarkEvent watermark) {
+            outputWatermarks.add(watermark.getWatermark());
+        }
+
+        @Override
+        public void collect(Long record) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() {}
+    }
+
+    private static class TestInternalTimeServiceManager
+            implements InternalTimeServiceManager<Long> {
+
+        @Override
+        public <N> InternalTimerService<N> getInternalTimerService(
+                String name,
+                TypeSerializer<Long> keySerializer,
+                TypeSerializer<N> namespaceSerializer,
+                Triggerable<Long, N> triggerable) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <N> InternalTimerService<N> getAsyncInternalTimerService(
+                String name,
+                TypeSerializer<Long> keySerializer,
+                TypeSerializer<N> namespaceSerializer,
+                Triggerable<Long, N> triggerable,
+                AsyncExecutionController<Long> asyncExecutionController) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void 
advanceWatermark(org.apache.flink.streaming.api.watermark.Watermark watermark)
+                throws Exception {
+            advancedEventTimes.add(watermark.getTimestamp());
+        }
+
+        @Override
+        public boolean tryAdvanceWatermark(
+                org.apache.flink.streaming.api.watermark.Watermark watermark,
+                ShouldStopAdvancingFn shouldStopAdvancingFn)
+                throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void snapshotToRawKeyedState(
+                KeyedStateCheckpointOutputStream stateCheckpointOutputStream, 
String operatorName)
+                throws Exception {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private void checkOutputEventTimeWatermarkValues(Long... 
expectedReceivedWatermarkValues) {
+        assertThat(
+                        outputWatermarks.stream()
+                                .filter(w -> w instanceof LongWatermark)
+                                .map(w -> ((LongWatermark) w).getValue()))
+                .containsExactly(expectedReceivedWatermarkValues);
+    }
+
+    private void checkOutputIdleStatusWatermarkValues(Boolean... 
expectedReceivedWatermarkValues) {
+        assertThat(
+                        outputWatermarks.stream()
+                                .filter(w -> w instanceof BoolWatermark)
+                                .map(w -> ((BoolWatermark) w).getValue()))
+                .containsExactly(expectedReceivedWatermarkValues);
+    }
+
+    private void checkAdvancedEventTimes(Long... expectedAdvancedEventTimes) {
+        
assertThat(advancedEventTimes).containsExactly(expectedAdvancedEventTimes);
+    }
+}

Reply via email to