This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69e67fa8ca704472545aada9b701e7ca9a0de408
Author: Xu Huang <huangxu.wal...@gmail.com>
AuthorDate: Thu Jan 16 13:46:42 2025 +0800

    [FLINK-37112][API] Process event time extension related watermarks in 
EventTimeProcessFunction for DataStream V2
---
 .../extension/eventtime/EventTimeExtension.java    | 134 ++++
 .../function/EventTimeProcessFunction.java         |  40 +
 .../OneInputEventTimeStreamProcessFunction.java    |  47 ++
 ...putBroadcastEventTimeStreamProcessFunction.java |  47 ++
 ...NonBroadcastEventTimeStreamProcessFunction.java |  47 ++
 .../TwoOutputEventTimeStreamProcessFunction.java   |  54 ++
 .../eventtime/timer/EventTimeManager.java          |  53 ++
 .../eventtime/EventTimeExtensionImpl.java          |  38 +
 ...ntTimeWrappedOneInputStreamProcessFunction.java | 143 ++++
 ...ppedTwoInputBroadcastStreamProcessFunction.java | 180 +++++
 ...dTwoInputNonBroadcastStreamProcessFunction.java | 179 +++++
 ...tTimeWrappedTwoOutputStreamProcessFunction.java | 156 ++++
 .../eventtime/timer/DefaultEventTimeManager.java   |  72 ++
 .../impl/operators/KeyedProcessOperator.java       |  17 +-
 .../KeyedTwoInputBroadcastProcessOperator.java     |  17 +-
 .../KeyedTwoInputNonBroadcastProcessOperator.java  |  18 +-
 .../operators/KeyedTwoOutputProcessOperator.java   |  21 +-
 .../datastream/impl/operators/ProcessOperator.java |  18 +
 .../TwoInputBroadcastProcessOperator.java          |  20 +
 .../TwoInputNonBroadcastProcessOperator.java       |  25 +
 .../impl/operators/TwoOutputProcessOperator.java   |  19 +
 .../impl/stream/BroadcastStreamImpl.java           |  15 +
 .../flink/datastream/impl/utils/StreamUtils.java   |  31 +
 .../eventtime/EventTimeExtensionITCase.java        | 848 +++++++++++++++++++++
 24 files changed, 2235 insertions(+), 4 deletions(-)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java
index 9417baef1d5..a0ac5f8c499 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java
@@ -23,9 +23,20 @@ import 
org.apache.flink.api.common.watermark.BoolWatermarkDeclaration;
 import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
 import org.apache.flink.api.common.watermark.Watermark;
 import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.EventTimeProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction;
 import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor;
 import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
 import org.apache.flink.datastream.api.function.ProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
 
 /**
  * The entry point for the event-time extension, which provides the following 
functionality:
@@ -47,6 +58,16 @@ import 
org.apache.flink.datastream.api.function.ProcessFunction;
  * source.process(watermarkGeneratorProcessFunction)
  *       .process(...)
  * }</pre>
+ *   <li>provides a tool to encapsulate a user-defined {@link 
EventTimeProcessFunction} to provide
+ *       the relevant components of the event-time extension.
+ *       <pre>{@code
+ * stream.process(
+ *          EventTimeExtension.wrapProcessFunction(
+ *              new CustomEventTimeProcessFunction()
+ *          )
+ *       )
+ *       .process(...)
+ * }</pre>
  * </ul>
  */
 @Experimental
@@ -136,4 +157,117 @@ public class EventTimeExtension {
             EventTimeExtractor<T> eventTimeExtractor) {
         return new EventTimeWatermarkGeneratorBuilder<>(eventTimeExtractor);
     }
+
+    // ======== Wrap user-defined {@link EventTimeProcessFunction} =========
+
+    /**
+     * Wrap the user-defined {@link OneInputEventTimeStreamProcessFunction}, 
which will provide
+     * related components such as {@link EventTimeManager} and declare the 
necessary built-in state
+     * required for the Timer, etc. Note that registering event timers of 
{@link
+     * EventTimeProcessFunction} can only be used with {@link 
KeyedPartitionStream}.
+     *
+     * @param processFunction The user-defined {@link 
OneInputEventTimeStreamProcessFunction} that
+     *     needs to be wrapped.
+     * @return The wrapped {@link OneInputStreamProcessFunction}.
+     */
+    public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> 
wrapProcessFunction(
+            OneInputEventTimeStreamProcessFunction<IN, OUT> processFunction) {
+        try {
+            return (OneInputStreamProcessFunction<IN, OUT>)
+                    getEventTimeExtensionImplClass()
+                            .getMethod(
+                                    "wrapProcessFunction",
+                                    
OneInputEventTimeStreamProcessFunction.class)
+                            .invoke(null, processFunction);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Wrap the user-defined {@link TwoOutputStreamProcessFunction}, which 
will provide related
+     * components such as {@link EventTimeManager} and declare the necessary 
built-in state required
+     * for the Timer, etc. Note that registering event timers of {@link 
EventTimeProcessFunction}
+     * can only be used with {@link KeyedPartitionStream}.
+     *
+     * @param processFunction The user-defined {@link 
TwoOutputEventTimeStreamProcessFunction} that
+     *     needs to be wrapped.
+     * @return The wrapped {@link TwoOutputStreamProcessFunction}.
+     */
+    public static <IN, OUT1, OUT2>
+            TwoOutputStreamProcessFunction<IN, OUT1, OUT2> wrapProcessFunction(
+                    TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> 
processFunction) {
+        try {
+            return (TwoOutputStreamProcessFunction<IN, OUT1, OUT2>)
+                    getEventTimeExtensionImplClass()
+                            .getMethod(
+                                    "wrapProcessFunction",
+                                    
TwoOutputEventTimeStreamProcessFunction.class)
+                            .invoke(null, processFunction);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Wrap the user-defined {@link 
TwoInputNonBroadcastEventTimeStreamProcessFunction}, which will
+     * provide related components such as {@link EventTimeManager} and declare 
the necessary
+     * built-in state required for the Timer, etc. Note that registering event 
timers of {@link
+     * EventTimeProcessFunction} can only be used with {@link 
KeyedPartitionStream}.
+     *
+     * @param processFunction The user-defined {@link
+     *     TwoInputNonBroadcastEventTimeStreamProcessFunction} that needs to 
be wrapped.
+     * @return The wrapped {@link TwoInputNonBroadcastStreamProcessFunction}.
+     */
+    public static <IN1, IN2, OUT>
+            TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> 
wrapProcessFunction(
+                    TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, 
IN2, OUT>
+                            processFunction) {
+        try {
+            return (TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>)
+                    getEventTimeExtensionImplClass()
+                            .getMethod(
+                                    "wrapProcessFunction",
+                                    
TwoInputNonBroadcastEventTimeStreamProcessFunction.class)
+                            .invoke(null, processFunction);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Wrap the user-defined {@link 
TwoInputBroadcastEventTimeStreamProcessFunction}, which will
+     * provide related components such as {@link EventTimeManager} and declare 
the necessary
+     * built-in state required for the Timer, etc. Note that registering event 
timers of {@link
+     * EventTimeProcessFunction} can only be used with {@link 
KeyedPartitionStream}.
+     *
+     * @param processFunction The user-defined {@link
+     *     TwoInputBroadcastEventTimeStreamProcessFunction} that needs to be 
wrapped.
+     * @return The wrapped {@link TwoInputBroadcastStreamProcessFunction}.
+     */
+    public static <IN1, IN2, OUT>
+            TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> 
wrapProcessFunction(
+                    TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, 
OUT>
+                            processFunction) {
+        try {
+            return (TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT>)
+                    getEventTimeExtensionImplClass()
+                            .getMethod(
+                                    "wrapProcessFunction",
+                                    
TwoInputBroadcastEventTimeStreamProcessFunction.class)
+                            .invoke(null, processFunction);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Get the implementation class of EventTimeExtension. */
+    private static Class<?> getEventTimeExtensionImplClass() {
+        try {
+            return Class.forName(
+                    
"org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Please ensure that flink-datastream in 
your class path");
+        }
+    }
 }
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java
new file mode 100644
index 00000000000..705cedcf321
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.function;
+
+import org.apache.flink.annotation.Experimental;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.ProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/**
+ * The base interface for event time processing, indicating that the {@link 
ProcessFunction} will be
+ * enriched with event time processing functions, such as registering event 
timers and handle event
+ * time watermarks.
+ *
+ * <p>Note that registering event timers can only be used with {@link 
KeyedPartitionStream}.
+ */
+@Experimental
+public interface EventTimeProcessFunction extends ProcessFunction {
+    /**
+     * Initialize the {@link EventTimeProcessFunction} with an instance of 
{@link EventTimeManager}.
+     * Note that this method should be invoked before the open method.
+     */
+    void initEventTimeProcessFunction(EventTimeManager eventTimeManager);
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java
new file mode 100644
index 00000000000..86da2a867d0
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/** The {@link OneInputStreamProcessFunction} that extends with event time 
support. */
+@Experimental
+public interface OneInputEventTimeStreamProcessFunction<IN, OUT>
+        extends EventTimeProcessFunction, OneInputStreamProcessFunction<IN, 
OUT> {
+
+    /**
+     * The {@code #onEventTimeWatermark} method signifies that the 
EventTimeProcessFunction has
+     * received an EventTimeWatermark. Other types of watermarks will be 
processed by the {@code
+     * ProcessFunction#onWatermark} method.
+     */
+    default void onEventTimeWatermark(
+            long watermarkTimestamp, Collector<OUT> output, 
NonPartitionedContext<OUT> ctx)
+            throws Exception {}
+
+    /**
+     * Invoked when an event-time timer fires. Note that it is only used in 
{@link
+     * KeyedPartitionStream}.
+     */
+    default void onEventTimer(long timestamp, Collector<OUT> output, 
PartitionedContext<OUT> ctx) {}
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java
new file mode 100644
index 00000000000..efa62f8a56f
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/** The {@link TwoInputBroadcastStreamProcessFunction} that extends with event 
time support. */
+@Experimental
+public interface TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT>
+        extends EventTimeProcessFunction, 
TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> {
+
+    /**
+     * The {@code #onEventTimeWatermark} method signifies that the 
EventTimeProcessFunction has
+     * received an EventTimeWatermark. Other types of watermarks will be 
processed by the {@code
+     * ProcessFunction#onWatermark} method.
+     */
+    default void onEventTimeWatermark(
+            long watermarkTimestamp, Collector<OUT> output, 
NonPartitionedContext<OUT> ctx)
+            throws Exception {}
+
+    /**
+     * Invoked when an event-time timer fires. Note that it is only used in 
{@link
+     * KeyedPartitionStream}.
+     */
+    default void onEventTimer(long timestamp, Collector<OUT> output, 
PartitionedContext<OUT> ctx) {}
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java
new file mode 100644
index 00000000000..1502825f781
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/** The {@link TwoInputNonBroadcastStreamProcessFunction} that extends with 
event time support. */
+@Experimental
+public interface TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, 
OUT>
+        extends EventTimeProcessFunction, 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> {
+
+    /**
+     * The {@code #onEventTimeWatermark} method signifies that the 
EventTimeProcessFunction has
+     * received an EventTimeWatermark. Other types of watermarks will be 
processed by the {@code
+     * ProcessFunction#onWatermark} method.
+     */
+    default void onEventTimeWatermark(
+            long watermarkTimestamp, Collector<OUT> output, 
NonPartitionedContext<OUT> ctx)
+            throws Exception {}
+
+    /**
+     * Invoked when an event-time timer fires. Note that it is only used in 
{@link
+     * KeyedPartitionStream}.
+     */
+    default void onEventTimer(long timestamp, Collector<OUT> output, 
PartitionedContext<OUT> ctx) {}
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java
new file mode 100644
index 00000000000..56e33db636f
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
+import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/** The {@link TwoOutputStreamProcessFunction} that extends with event time 
support. */
+@Experimental
+public interface TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2>
+        extends EventTimeProcessFunction, TwoOutputStreamProcessFunction<IN, 
OUT1, OUT2> {
+
+    /**
+     * The {@code #onEventTimeWatermark} method signifies that the 
EventTimeProcessFunction has
+     * received an EventTimeWatermark. Other types of watermarks will be 
processed by the {@code
+     * ProcessFunction#onWatermark} method.
+     */
+    default void onEventTimeWatermark(
+            long watermarkTimestamp,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputNonPartitionedContext<OUT1, OUT2> ctx)
+            throws Exception {}
+
+    /**
+     * Invoked when an event-time timer fires. Note that it is only used in 
{@link
+     * KeyedPartitionStream}.
+     */
+    default void onEventTimer(
+            long timestamp,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx) {}
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java
new file mode 100644
index 00000000000..c5df8e26dea
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.timer;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/**
+ * This class is responsible for managing stuff related to event-time/timer. 
For example, register
+ * and delete event timers, as well as retrieve event time. Note that methods 
for timer can only be
+ * used in {@link KeyedPartitionStream}.
+ */
+@Experimental
+public interface EventTimeManager {
+    /**
+     * Register an event timer for this process function. The {@code 
onEventTimer} method will be
+     * invoked when the event time is reached.
+     *
+     * @param timestamp to trigger timer callback.
+     */
+    void registerTimer(long timestamp);
+
+    /**
+     * Deletes the event-time timer with the given trigger timestamp. This 
method has only an effect
+     * if such a timer was previously registered and did not already expire.
+     *
+     * @param timestamp indicates the timestamp of the timer to delete.
+     */
+    void deleteTimer(long timestamp);
+
+    /**
+     * Get the current event time.
+     *
+     * @return current event time.
+     */
+    long currentTime();
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java
index a4a26bc9490..f01972559d6 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java
@@ -19,8 +19,19 @@ package org.apache.flink.datastream.impl.extension.eventtime;
 
 import org.apache.flink.api.common.watermark.Watermark;
 import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction;
 import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy;
 import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction;
 import 
org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction;
 
 /** The implementation of {@link EventTimeExtension}. */
@@ -37,6 +48,33 @@ public class EventTimeExtensionImpl {
         return new ExtractEventTimeProcessFunction<>(strategy);
     }
 
+    // ============= Wrap Event Time Process Function =============
+
+    public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> 
wrapProcessFunction(
+            OneInputEventTimeStreamProcessFunction<IN, OUT> processFunction) {
+        return new 
EventTimeWrappedOneInputStreamProcessFunction<>(processFunction);
+    }
+
+    public static <IN, OUT1, OUT2>
+            TwoOutputStreamProcessFunction<IN, OUT1, OUT2> wrapProcessFunction(
+                    TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> 
processFunction) {
+        return new 
EventTimeWrappedTwoOutputStreamProcessFunction<>(processFunction);
+    }
+
+    public static <IN1, IN2, OUT>
+            TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> 
wrapProcessFunction(
+                    TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, 
IN2, OUT>
+                            processFunction) {
+        return new 
EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<>(processFunction);
+    }
+
+    public static <IN1, IN2, OUT>
+            TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> 
wrapProcessFunction(
+                    TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, 
OUT>
+                            processFunction) {
+        return new 
EventTimeWrappedTwoInputBroadcastStreamProcessFunction<>(processFunction);
+    }
+
     // ============= Other Utils =============
     public static boolean isEventTimeExtensionWatermark(Watermark watermark) {
         return EventTimeExtension.isEventTimeWatermark(watermark)
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java
new file mode 100644
index 00000000000..b1867af7a39
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.eventtime.functions;
+
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
+import 
org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * The wrapped {@link OneInputEventTimeStreamProcessFunction} that take care 
of event-time alignment
+ * with idleness.
+ */
+public class EventTimeWrappedOneInputStreamProcessFunction<IN, OUT>
+        implements OneInputStreamProcessFunction<IN, OUT> {
+
+    private final OneInputEventTimeStreamProcessFunction<IN, OUT> 
wrappedUserFunction;
+
+    private transient EventTimeManager eventTimeManager;
+
+    private transient EventTimeWatermarkHandler eventTimeWatermarkHandler;
+
+    public EventTimeWrappedOneInputStreamProcessFunction(
+            OneInputEventTimeStreamProcessFunction<IN, OUT> 
wrappedUserFunction) {
+        this.wrappedUserFunction = 
Preconditions.checkNotNull(wrappedUserFunction);
+    }
+
+    @Override
+    public void open(NonPartitionedContext<OUT> ctx) throws Exception {
+        wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager);
+        wrappedUserFunction.open(ctx);
+    }
+
+    /**
+     * Initialize the event time extension, note that this method should be 
invoked before open
+     * method.
+     */
+    public void initEventTimeExtension(
+            @Nullable InternalTimerService<VoidNamespace> timerService,
+            Supplier<Long> eventTimeSupplier,
+            EventTimeWatermarkHandler eventTimeWatermarkHandler) {
+        this.eventTimeManager = new DefaultEventTimeManager(timerService, 
eventTimeSupplier);
+        this.eventTimeWatermarkHandler = eventTimeWatermarkHandler;
+    }
+
+    @Override
+    public void processRecord(IN record, Collector<OUT> output, 
PartitionedContext<OUT> ctx)
+            throws Exception {
+        wrappedUserFunction.processRecord(record, output, ctx);
+    }
+
+    @Override
+    public void endInput(NonPartitionedContext<OUT> ctx) {
+        wrappedUserFunction.endInput(ctx);
+    }
+
+    @Override
+    public void onProcessingTimer(
+            long timestamp, Collector<OUT> output, PartitionedContext<OUT> 
ctx) {
+        wrappedUserFunction.onProcessingTimer(timestamp, output, ctx);
+    }
+
+    @Override
+    public WatermarkHandlingResult onWatermark(
+            Watermark watermark, Collector<OUT> output, 
NonPartitionedContext<OUT> ctx) {
+        if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) {
+            // If the watermark is from the event time extension, process it 
and call {@code
+            // userFunction#onEventTimeWatermark} when the event time is 
updated; otherwise, forward
+            // the watermark.
+            try {
+                EventTimeWatermarkHandler.EventTimeUpdateStatus 
eventTimeUpdateStatus =
+                        eventTimeWatermarkHandler.processWatermark(watermark, 
0);
+                if (eventTimeUpdateStatus.isEventTimeUpdated()) {
+                    wrappedUserFunction.onEventTimeWatermark(
+                            eventTimeUpdateStatus.getNewEventTime(), output, 
ctx);
+                }
+            } catch (Exception e) {
+                ExceptionUtils.rethrow(e);
+            }
+
+            // return POLL to indicate that the watermark has been processed
+            return WatermarkHandlingResult.POLL;
+        } else {
+            return wrappedUserFunction.onWatermark(watermark, output, ctx);
+        }
+    }
+
+    public void onEventTime(long timestamp, Collector<OUT> output, 
PartitionedContext ctx) {
+        wrappedUserFunction.onEventTimer(timestamp, output, ctx);
+    }
+
+    @Override
+    public Set<StateDeclaration> usesStates() {
+        return wrappedUserFunction.usesStates();
+    }
+
+    @Override
+    public Set<? extends WatermarkDeclaration> declareWatermarks() {
+        return wrappedUserFunction.declareWatermarks();
+    }
+
+    @Override
+    public void close() throws Exception {
+        wrappedUserFunction.close();
+    }
+
+    public OneInputStreamProcessFunction<IN, OUT> getWrappedUserFunction() {
+        return wrappedUserFunction;
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java
new file mode 100644
index 00000000000..db3c256482b
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.eventtime.functions;
+
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
+import 
org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * The wrapped {@link TwoInputBroadcastEventTimeStreamProcessFunction} that 
take care of event-time
+ * alignment with idleness.
+ */
+public class EventTimeWrappedTwoInputBroadcastStreamProcessFunction<IN1, IN2, 
OUT>
+        implements TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> {
+
+    private final TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, 
OUT>
+            wrappedUserFunction;
+
+    private transient EventTimeManager eventTimeManager;
+
+    private transient EventTimeWatermarkHandler eventTimeWatermarkHandler;
+
+    public EventTimeWrappedTwoInputBroadcastStreamProcessFunction(
+            TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> 
wrappedUserFunction) {
+        this.wrappedUserFunction = 
Preconditions.checkNotNull(wrappedUserFunction);
+    }
+
+    @Override
+    public void open(NonPartitionedContext<OUT> ctx) throws Exception {
+        wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager);
+        wrappedUserFunction.open(ctx);
+    }
+
+    /**
+     * Initialize the event time extension, note that this method should be 
invoked before open
+     * method.
+     */
+    public void initEventTimeExtension(
+            @Nullable InternalTimerService<VoidNamespace> timerService,
+            Supplier<Long> eventTimeSupplier,
+            EventTimeWatermarkHandler eventTimeWatermarkHandler) {
+        this.eventTimeManager = new DefaultEventTimeManager(timerService, 
eventTimeSupplier);
+        this.eventTimeWatermarkHandler = eventTimeWatermarkHandler;
+    }
+
+    @Override
+    public void processRecordFromNonBroadcastInput(
+            IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) 
throws Exception {
+        wrappedUserFunction.processRecordFromNonBroadcastInput(record, output, 
ctx);
+    }
+
+    @Override
+    public void processRecordFromBroadcastInput(IN2 record, 
NonPartitionedContext<OUT> ctx)
+            throws Exception {
+        wrappedUserFunction.processRecordFromBroadcastInput(record, ctx);
+    }
+
+    @Override
+    public void endBroadcastInput(NonPartitionedContext<OUT> ctx) {
+        wrappedUserFunction.endBroadcastInput(ctx);
+    }
+
+    @Override
+    public void endNonBroadcastInput(NonPartitionedContext<OUT> ctx) {
+        wrappedUserFunction.endNonBroadcastInput(ctx);
+    }
+
+    @Override
+    public void onProcessingTimer(
+            long timestamp, Collector<OUT> output, PartitionedContext<OUT> 
ctx) {
+        wrappedUserFunction.onProcessingTimer(timestamp, output, ctx);
+    }
+
+    @Override
+    public WatermarkHandlingResult onWatermarkFromBroadcastInput(
+            Watermark watermark, Collector<OUT> output, 
NonPartitionedContext<OUT> ctx) {
+        if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) {
+            // If the watermark is from the event time extension, process it 
and call {@code
+            // userFunction#onEventTimeWatermark} when the event time is 
updated; otherwise, forward
+            // the watermark.
+            try {
+
+                EventTimeWatermarkHandler.EventTimeUpdateStatus 
eventTimeUpdateStatus =
+                        eventTimeWatermarkHandler.processWatermark(watermark, 
0);
+                if (eventTimeUpdateStatus.isEventTimeUpdated()) {
+                    wrappedUserFunction.onEventTimeWatermark(
+                            eventTimeUpdateStatus.getNewEventTime(), output, 
ctx);
+                }
+            } catch (Exception e) {
+                ExceptionUtils.rethrow(e);
+            }
+            // return POLL to indicate that the watermark has been processed
+            return WatermarkHandlingResult.POLL;
+        } else {
+            return 
wrappedUserFunction.onWatermarkFromBroadcastInput(watermark, output, ctx);
+        }
+    }
+
+    @Override
+    public WatermarkHandlingResult onWatermarkFromNonBroadcastInput(
+            Watermark watermark, Collector<OUT> output, 
NonPartitionedContext<OUT> ctx) {
+        if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) {
+            // If the watermark is from the event time extension, process it 
and call {@code
+            // userFunction#onEventTimeWatermark} when the event time is 
updated; otherwise, forward
+            // the watermark.
+            try {
+                EventTimeWatermarkHandler.EventTimeUpdateStatus 
eventTimeUpdateStatus =
+                        eventTimeWatermarkHandler.processWatermark(watermark, 
1);
+                if (eventTimeUpdateStatus.isEventTimeUpdated()) {
+                    wrappedUserFunction.onEventTimeWatermark(
+                            eventTimeUpdateStatus.getNewEventTime(), output, 
ctx);
+                }
+            } catch (Exception e) {
+                ExceptionUtils.rethrow(e);
+            }
+            // return POLL to indicate that the watermark has been processed
+            return WatermarkHandlingResult.POLL;
+        } else {
+            return 
wrappedUserFunction.onWatermarkFromBroadcastInput(watermark, output, ctx);
+        }
+    }
+
+    public void onEventTime(long timestamp, Collector<OUT> output, 
PartitionedContext<OUT> ctx) {
+        wrappedUserFunction.onEventTimer(timestamp, output, ctx);
+    }
+
+    @Override
+    public void close() throws Exception {
+        wrappedUserFunction.close();
+    }
+
+    @Override
+    public Set<StateDeclaration> usesStates() {
+        return wrappedUserFunction.usesStates();
+    }
+
+    @Override
+    public Set<? extends WatermarkDeclaration> declareWatermarks() {
+        return wrappedUserFunction.declareWatermarks();
+    }
+
+    public TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> 
getWrappedUserFunction() {
+        return wrappedUserFunction;
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java
new file mode 100644
index 00000000000..a8dc809a90e
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.eventtime.functions;
+
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
+import 
org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * The wrapped {@link TwoInputNonBroadcastEventTimeStreamProcessFunction} that 
take care of
+ * event-time alignment with idleness.
+ */
+public class EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<IN1, 
IN2, OUT>
+        implements TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> {
+
+    private final TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, 
OUT>
+            wrappedUserFunction;
+
+    private transient EventTimeManager eventTimeManager;
+
+    private transient EventTimeWatermarkHandler eventTimeWatermarkHandler;
+
+    public EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction(
+            TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> 
wrappedUserFunction) {
+        this.wrappedUserFunction = 
Preconditions.checkNotNull(wrappedUserFunction);
+    }
+
+    @Override
+    public void open(NonPartitionedContext<OUT> ctx) throws Exception {
+        wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager);
+        wrappedUserFunction.open(ctx);
+    }
+
+    /**
+     * Initialize the event time extension, note that this method should be 
invoked before open
+     * method.
+     */
+    public void initEventTimeExtension(
+            @Nullable InternalTimerService<VoidNamespace> timerService,
+            Supplier<Long> eventTimeSupplier,
+            EventTimeWatermarkHandler eventTimeWatermarkHandler) {
+        this.eventTimeManager = new DefaultEventTimeManager(timerService, 
eventTimeSupplier);
+        this.eventTimeWatermarkHandler = eventTimeWatermarkHandler;
+    }
+
+    @Override
+    public void processRecordFromFirstInput(
+            IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) 
throws Exception {
+        wrappedUserFunction.processRecordFromFirstInput(record, output, ctx);
+    }
+
+    @Override
+    public void processRecordFromSecondInput(
+            IN2 record, Collector<OUT> output, PartitionedContext<OUT> ctx) 
throws Exception {
+        wrappedUserFunction.processRecordFromSecondInput(record, output, ctx);
+    }
+
+    @Override
+    public void endFirstInput(NonPartitionedContext<OUT> ctx) {
+        wrappedUserFunction.endFirstInput(ctx);
+    }
+
+    @Override
+    public void endSecondInput(NonPartitionedContext<OUT> ctx) {
+        wrappedUserFunction.endSecondInput(ctx);
+    }
+
+    @Override
+    public void onProcessingTimer(
+            long timestamp, Collector<OUT> output, PartitionedContext<OUT> 
ctx) {
+        wrappedUserFunction.onProcessingTimer(timestamp, output, ctx);
+    }
+
+    @Override
+    public WatermarkHandlingResult onWatermarkFromFirstInput(
+            Watermark watermark, Collector<OUT> output, 
NonPartitionedContext<OUT> ctx) {
+        if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) {
+            // If the watermark is from the event time extension, process it 
and call {@code
+            // userFunction#onEventTimeWatermark} when the event time is 
updated; otherwise, forward
+            // the watermark.
+            try {
+                EventTimeWatermarkHandler.EventTimeUpdateStatus 
eventTimeUpdateStatus =
+                        eventTimeWatermarkHandler.processWatermark(watermark, 
0);
+                if (eventTimeUpdateStatus.isEventTimeUpdated()) {
+                    wrappedUserFunction.onEventTimeWatermark(
+                            eventTimeUpdateStatus.getNewEventTime(), output, 
ctx);
+                }
+            } catch (Exception e) {
+                ExceptionUtils.rethrow(e);
+            }
+            // return POLL to indicate that the watermark has been processed
+            return WatermarkHandlingResult.POLL;
+        } else {
+            return wrappedUserFunction.onWatermarkFromFirstInput(watermark, 
output, ctx);
+        }
+    }
+
+    @Override
+    public WatermarkHandlingResult onWatermarkFromSecondInput(
+            Watermark watermark, Collector<OUT> output, 
NonPartitionedContext<OUT> ctx) {
+        if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) {
+            // If the watermark is from the event time extension, process it 
and call {@code
+            // userFunction#onEventTimeWatermark} when the event time is 
updated; otherwise, forward
+            // the watermark.
+            try {
+                EventTimeWatermarkHandler.EventTimeUpdateStatus 
eventTimeUpdateStatus =
+                        eventTimeWatermarkHandler.processWatermark(watermark, 
1);
+                if (eventTimeUpdateStatus.isEventTimeUpdated()) {
+                    wrappedUserFunction.onEventTimeWatermark(
+                            eventTimeUpdateStatus.getNewEventTime(), output, 
ctx);
+                }
+            } catch (Exception e) {
+                ExceptionUtils.rethrow(e);
+            }
+            // return POLL to indicate that the watermark has been processed
+            return WatermarkHandlingResult.POLL;
+        } else {
+            return wrappedUserFunction.onWatermarkFromSecondInput(watermark, 
output, ctx);
+        }
+    }
+
+    public void onEventTime(long timestamp, Collector<OUT> output, 
PartitionedContext<OUT> ctx) {
+        wrappedUserFunction.onEventTimer(timestamp, output, ctx);
+    }
+
+    @Override
+    public void close() throws Exception {
+        wrappedUserFunction.close();
+    }
+
+    @Override
+    public Set<StateDeclaration> usesStates() {
+        return wrappedUserFunction.usesStates();
+    }
+
+    @Override
+    public Set<? extends WatermarkDeclaration> declareWatermarks() {
+        return wrappedUserFunction.declareWatermarks();
+    }
+
+    public TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> 
getWrappedUserFunction() {
+        return wrappedUserFunction;
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java
new file mode 100644
index 00000000000..5a272f31588
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.eventtime.functions;
+
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
+import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
+import 
org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * The wrapped {@link TwoOutputEventTimeStreamProcessFunction} that take care 
of event-time
+ * alignment with idleness.
+ */
+public class EventTimeWrappedTwoOutputStreamProcessFunction<IN, OUT1, OUT2>
+        implements TwoOutputStreamProcessFunction<IN, OUT1, OUT2> {
+
+    private final TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> 
wrappedUserFunction;
+
+    private transient EventTimeManager eventTimeManager;
+
+    private transient EventTimeWatermarkHandler eventTimeWatermarkHandler;
+
+    public EventTimeWrappedTwoOutputStreamProcessFunction(
+            TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> 
wrappedUserFunction) {
+        this.wrappedUserFunction = 
Preconditions.checkNotNull(wrappedUserFunction);
+    }
+
+    @Override
+    public void open(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) throws 
Exception {
+        wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager);
+        wrappedUserFunction.open(ctx);
+    }
+
+    /**
+     * Initialize the event time extension, note that this method should be 
invoked before open
+     * method.
+     */
+    public void initEventTimeExtension(
+            @Nullable InternalTimerService<VoidNamespace> timerService,
+            Supplier<Long> eventTimeSupplier,
+            EventTimeWatermarkHandler eventTimeWatermarkHandler) {
+        this.eventTimeManager = new DefaultEventTimeManager(timerService, 
eventTimeSupplier);
+        this.eventTimeWatermarkHandler = eventTimeWatermarkHandler;
+    }
+
+    @Override
+    public void processRecord(
+            IN record,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx)
+            throws Exception {
+        wrappedUserFunction.processRecord(record, output1, output2, ctx);
+    }
+
+    @Override
+    public void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {
+        wrappedUserFunction.endInput(ctx);
+    }
+
+    @Override
+    public void onProcessingTimer(
+            long timestamp,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx) {
+        wrappedUserFunction.onProcessingTimer(timestamp, output1, output2, 
ctx);
+    }
+
+    @Override
+    public WatermarkHandlingResult onWatermark(
+            Watermark watermark,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {
+        if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) {
+            // If the watermark is from the event time extension, process it 
and call {@code
+            // userFunction#onEventTimeWatermark} when the event time is 
updated; otherwise, forward
+            // the watermark.
+            try {
+                EventTimeWatermarkHandler.EventTimeUpdateStatus 
eventTimeUpdateStatus =
+                        eventTimeWatermarkHandler.processWatermark(watermark, 
0);
+                if (eventTimeUpdateStatus.isEventTimeUpdated()) {
+                    wrappedUserFunction.onEventTimeWatermark(
+                            eventTimeUpdateStatus.getNewEventTime(), output1, 
output2, ctx);
+                }
+            } catch (Exception e) {
+                ExceptionUtils.rethrow(e);
+            }
+            // return POLL to indicate that the watermark has been processed
+            return WatermarkHandlingResult.POLL;
+        } else {
+            return wrappedUserFunction.onWatermark(watermark, output1, 
output2, ctx);
+        }
+    }
+
+    public void onEventTime(
+            long timestamp,
+            Collector<OUT1> output1,
+            Collector<OUT2> output2,
+            TwoOutputPartitionedContext<OUT1, OUT2> ctx) {
+        wrappedUserFunction.onEventTimer(timestamp, output1, output2, ctx);
+    }
+
+    @Override
+    public Set<StateDeclaration> usesStates() {
+        return wrappedUserFunction.usesStates();
+    }
+
+    @Override
+    public Set<? extends WatermarkDeclaration> declareWatermarks() {
+        return wrappedUserFunction.declareWatermarks();
+    }
+
+    @Override
+    public void close() throws Exception {
+        wrappedUserFunction.close();
+    }
+
+    public TwoOutputStreamProcessFunction<IN, OUT1, OUT2> 
getWrappedUserFunction() {
+        return wrappedUserFunction;
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java
new file mode 100644
index 00000000000..8beb78a7cc9
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.eventtime.timer;
+
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+
+import javax.annotation.Nullable;
+
+import java.util.function.Supplier;
+
+/** The implementation of {@link EventTimeManager}. */
+public class DefaultEventTimeManager implements EventTimeManager {
+
+    /**
+     * The timer service of operator, used in register event timer. Note that 
it cloud be null if
+     * the operator is not a keyed operator.
+     */
+    @Nullable private final InternalTimerService<VoidNamespace> timerService;
+
+    /** The supplier of the current event time. */
+    private final Supplier<Long> eventTimeSupplier;
+
+    public DefaultEventTimeManager(
+            @Nullable InternalTimerService<VoidNamespace> timerService,
+            Supplier<Long> eventTimeSupplier) {
+        this.timerService = timerService;
+        this.eventTimeSupplier = eventTimeSupplier;
+    }
+
+    @Override
+    public void registerTimer(long timestamp) {
+        if (timerService == null) {
+            throw new UnsupportedOperationException(
+                    "Registering event timer is not allowed in NonKeyed 
Stream.");
+        }
+
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
+    }
+
+    @Override
+    public void deleteTimer(long timestamp) {
+        if (timerService == null) {
+            throw new UnsupportedOperationException(
+                    "Deleting event timer is not allowed in NonKeyed Stream.");
+        }
+
+        timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
+    }
+
+    @Override
+    public long currentTime() {
+        return eventTimeSupplier.get();
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
index 069b4991f44..23bae4b9921 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -39,6 +40,7 @@ import javax.annotation.Nullable;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.Supplier;
 
 /** Operator for {@link OneInputStreamProcessFunction} in {@link 
KeyedPartitionStream}. */
 public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, 
OUT>
@@ -84,7 +86,10 @@ public class KeyedProcessOperator<KEY, IN, OUT> extends 
ProcessOperator<IN, OUT>
 
     @Override
     public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws 
Exception {
-        // do nothing at the moment.
+        if (userFunction instanceof 
EventTimeWrappedOneInputStreamProcessFunction) {
+            ((EventTimeWrappedOneInputStreamProcessFunction<IN, OUT>) 
userFunction)
+                    .onEventTime(timer.getTimestamp(), getOutputCollector(), 
partitionedContext);
+        }
     }
 
     @Override
@@ -121,4 +126,14 @@ public class KeyedProcessOperator<KEY, IN, OUT> extends 
ProcessOperator<IN, OUT>
     public boolean isAsyncStateProcessingEnabled() {
         return true;
     }
+
+    @Override
+    protected InternalTimerService<VoidNamespace> getTimerService() {
+        return timerService;
+    }
+
+    @Override
+    protected Supplier<Long> getEventTimeSupplier() {
+        return () -> timerService.currentWatermark();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
index 53cb2dea7d6..bb985643e76 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -39,6 +40,7 @@ import javax.annotation.Nullable;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.Supplier;
 
 /** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link 
KeyedPartitionStream}. */
 public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT>
@@ -90,7 +92,10 @@ public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, 
IN2, OUT>
 
     @Override
     public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws 
Exception {
-        // do nothing at the moment.
+        if (userFunction instanceof 
EventTimeWrappedTwoInputBroadcastStreamProcessFunction) {
+            ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction<IN1, IN2, 
OUT>) userFunction)
+                    .onEventTime(timer.getTimestamp(), getOutputCollector(), 
partitionedContext);
+        }
     }
 
     @Override
@@ -123,4 +128,14 @@ public class KeyedTwoInputBroadcastProcessOperator<KEY, 
IN1, IN2, OUT>
     public boolean isAsyncStateProcessingEnabled() {
         return true;
     }
+
+    @Override
+    protected InternalTimerService<VoidNamespace> getTimerService() {
+        return timerService;
+    }
+
+    @Override
+    protected Supplier<Long> getEventTimeSupplier() {
+        return () -> timerService.currentWatermark();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
index 2644dc11b6a..f6e0f8b71ec 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -39,6 +40,7 @@ import javax.annotation.Nullable;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.Supplier;
 
 /**
  * Operator for {@link TwoInputNonBroadcastStreamProcessFunction} in {@link 
KeyedPartitionStream}.
@@ -92,7 +94,11 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, 
IN1, IN2, OUT>
 
     @Override
     public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws 
Exception {
-        // do nothing at the moment.
+        if (userFunction instanceof 
EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) {
+            ((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<IN1, 
IN2, OUT>)
+                            userFunction)
+                    .onEventTime(timer.getTimestamp(), getOutputCollector(), 
partitionedContext);
+        }
     }
 
     @Override
@@ -131,4 +137,14 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, 
IN1, IN2, OUT>
     public boolean isAsyncStateProcessingEnabled() {
         return true;
     }
+
+    @Override
+    protected InternalTimerService<VoidNamespace> getTimerService() {
+        return timerService;
+    }
+
+    @Override
+    protected Supplier<Long> getEventTimeSupplier() {
+        return () -> timerService.currentWatermark();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
index aefc1b1b60d..db9dfc2cb42 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
 import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager;
 import 
org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimer;
@@ -40,6 +41,7 @@ import javax.annotation.Nullable;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.Supplier;
 
 /** */
 public class KeyedTwoOutputProcessOperator<KEY, IN, OUT_MAIN, OUT_SIDE>
@@ -113,7 +115,14 @@ public class KeyedTwoOutputProcessOperator<KEY, IN, 
OUT_MAIN, OUT_SIDE>
 
     @Override
     public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws 
Exception {
-        // do nothing at the moment.
+        if (userFunction instanceof 
EventTimeWrappedTwoOutputStreamProcessFunction) {
+            ((EventTimeWrappedTwoOutputStreamProcessFunction<IN, OUT_MAIN, 
OUT_SIDE>) userFunction)
+                    .onEventTime(
+                            timer.getTimestamp(),
+                            getMainCollector(),
+                            getSideCollector(),
+                            partitionedContext);
+        }
     }
 
     @Override
@@ -146,4 +155,14 @@ public class KeyedTwoOutputProcessOperator<KEY, IN, 
OUT_MAIN, OUT_SIDE>
     public boolean isAsyncStateProcessingEnabled() {
         return true;
     }
+
+    @Override
+    protected InternalTimerService<VoidNamespace> getTimerService() {
+        return timerService;
+    }
+
+    @Override
+    protected Supplier<Long> getEventTimeSupplier() {
+        return () -> timerService.currentWatermark();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index 1782e043414..efb95e1ebcc 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -31,10 +31,13 @@ import 
org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction;
 import 
org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
+import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -44,6 +47,7 @@ import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTim
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /** Operator for {@link OneInputStreamProcessFunction}. */
@@ -114,6 +118,12 @@ public class ProcessOperator<IN, OUT>
                             getExecutionConfig(),
                             
partitionedContext.getNonPartitionedContext().getWatermarkManager(),
                             getProcessingTimeService());
+        } else if (userFunction instanceof 
EventTimeWrappedOneInputStreamProcessFunction) {
+            // note that the {@code initEventTimeExtension} in 
EventTimeWrappedProcessFunction
+            // should be invoked before the {@code open}.
+            ((EventTimeWrappedOneInputStreamProcessFunction<IN, OUT>) 
userFunction)
+                    .initEventTimeExtension(
+                            getTimerService(), getEventTimeSupplier(), 
eventTimeWatermarkHandler);
         }
 
         userFunction.open(nonPartitionedContext);
@@ -201,4 +211,12 @@ public class ProcessOperator<IN, OUT>
         // For non-keyed operators, we disable async state processing.
         return false;
     }
+
+    protected InternalTimerService<VoidNamespace> getTimerService() {
+        return null;
+    }
+
+    protected Supplier<Long> getEventTimeSupplier() {
+        return () -> eventTimeWatermarkHandler.getLastEmitWatermark();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
index 926c133f0fe..0a661fdf1e2 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
@@ -31,9 +31,12 @@ import 
org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
+import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -43,6 +46,7 @@ import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTim
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -108,6 +112,14 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, 
OUT>
         this.eventTimeWatermarkHandler =
                 new EventTimeWatermarkHandler(2, output, timeServiceManager);
 
+        if (userFunction instanceof 
EventTimeWrappedTwoInputBroadcastStreamProcessFunction) {
+            // note that the {@code initEventTimeExtension} in 
EventTimeWrappedProcessFunction
+            // should be invoked before the {@code open}.
+            ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction<IN1, IN2, 
OUT>) userFunction)
+                    .initEventTimeExtension(
+                            getTimerService(), getEventTimeSupplier(), 
eventTimeWatermarkHandler);
+        }
+
         this.userFunction.open(this.nonPartitionedContext);
     }
 
@@ -224,4 +236,12 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, 
OUT>
         // For non-keyed operators, we disable async state processing.
         return false;
     }
+
+    protected InternalTimerService<VoidNamespace> getTimerService() {
+        return null;
+    }
+
+    protected Supplier<Long> getEventTimeSupplier() {
+        return () -> eventTimeWatermarkHandler.getLastEmitWatermark();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
index 3d699dbbf2d..05c207a082c 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.watermark.WatermarkHandlingResult;
 import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy;
 import org.apache.flink.datastream.api.context.NonPartitionedContext;
 import org.apache.flink.datastream.api.context.ProcessingTimeManager;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
 import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
 import org.apache.flink.datastream.impl.common.OutputCollector;
 import org.apache.flink.datastream.impl.common.TimestampCollector;
@@ -31,10 +32,14 @@ import 
org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -44,6 +49,7 @@ import 
org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTim
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -111,6 +117,17 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
         this.eventTimeWatermarkHandler =
                 new EventTimeWatermarkHandler(2, output, timeServiceManager);
 
+        if (userFunction instanceof 
EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) {
+            // note that the {@code initEventTimeExtension} in 
EventTimeWrappedProcessFunction
+            // should be invoked before the {@code open}.
+            EventTimeManager eventTimeManager =
+                    new DefaultEventTimeManager(getTimerService(), 
getEventTimeSupplier());
+            ((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<IN1, 
IN2, OUT>)
+                            userFunction)
+                    .initEventTimeExtension(
+                            getTimerService(), getEventTimeSupplier(), 
eventTimeWatermarkHandler);
+        }
+
         this.userFunction.open(this.nonPartitionedContext);
     }
 
@@ -227,4 +244,12 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
         // For non-keyed operators, we disable async state processing.
         return false;
     }
+
+    protected InternalTimerService<VoidNamespace> getTimerService() {
+        return null;
+    }
+
+    protected Supplier<Long> getEventTimeSupplier() {
+        return () -> eventTimeWatermarkHandler.getLastEmitWatermark();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
index 91d0c0f074f..cdf36e55ef6 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
@@ -32,9 +32,12 @@ import 
org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedCo
 import 
org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
 import 
org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
+import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -46,6 +49,7 @@ import org.apache.flink.util.OutputTag;
 import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 /**
@@ -119,6 +123,13 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, 
OUT_SIDE>
         
this.partitionedContext.setNonPartitionedContext(nonPartitionedContext);
         this.eventTimeWatermarkHandler =
                 new EventTimeWatermarkHandler(1, output, timeServiceManager);
+        if (userFunction instanceof 
EventTimeWrappedTwoOutputStreamProcessFunction) {
+            // note that the {@code initEventTimeExtension} in 
EventTimeWrappedProcessFunction
+            // should be invoked before the {@code open}.
+            ((EventTimeWrappedTwoOutputStreamProcessFunction<IN, OUT_MAIN, 
OUT_SIDE>) userFunction)
+                    .initEventTimeExtension(
+                            getTimerService(), getEventTimeSupplier(), 
eventTimeWatermarkHandler);
+        }
 
         this.userFunction.open(this.nonPartitionedContext);
     }
@@ -236,4 +247,12 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, 
OUT_SIDE>
         // For non-keyed operators, we disable async state processing.
         return false;
     }
+
+    protected InternalTimerService<VoidNamespace> getTimerService() {
+        return null;
+    }
+
+    protected Supplier<Long> getEventTimeSupplier() {
+        return () -> eventTimeWatermarkHandler.getLastEmitWatermark();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java
index 8b02eabdaf3..724dd7b1a64 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java
@@ -63,6 +63,11 @@ public class BroadcastStreamImpl<T> extends 
AbstractDataStream<T> implements Bro
         // no state redistribution mode check is required here, since all 
redistribution modes are
         // acceptable
 
+        other =
+                other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl
+                        ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) 
other)
+                                .getKeyedPartitionStream()
+                        : other;
         TypeInformation<OUT> outTypeInfo =
                 StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
                         processFunction,
@@ -92,6 +97,11 @@ public class BroadcastStreamImpl<T> extends 
AbstractDataStream<T> implements Bro
                 processFunction.usesStates(),
                 new 
HashSet<>(Collections.singletonList(StateDeclaration.RedistributionMode.NONE)));
 
+        other =
+                other instanceof 
ProcessConfigurableAndNonKeyedPartitionStreamImpl
+                        ? ((ProcessConfigurableAndNonKeyedPartitionStreamImpl) 
other)
+                                .getNonKeyedPartitionStream()
+                        : other;
         TypeInformation<OUT> outTypeInfo =
                 StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
                         processFunction,
@@ -118,6 +128,11 @@ public class BroadcastStreamImpl<T> extends 
AbstractDataStream<T> implements Bro
             KeyedPartitionStream<K, T_OTHER> other,
             TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> 
processFunction,
             KeySelector<OUT, K> newKeySelector) {
+        other =
+                other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl
+                        ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) 
other)
+                                .getKeyedPartitionStream()
+                        : other;
         TypeInformation<OUT> outTypeInfo =
                 StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction(
                         processFunction,
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
index 6ec420a8135..3f73608dcf9 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
@@ -34,6 +34,10 @@ import 
org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
 import 
org.apache.flink.datastream.api.stream.GlobalStream.ProcessConfigurableAndGlobalStream;
 import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction;
 import 
org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction;
 import org.apache.flink.datastream.impl.stream.AbstractDataStream;
 import org.apache.flink.datastream.impl.stream.GlobalStreamImpl;
@@ -66,6 +70,12 @@ public final class StreamUtils {
     public static <IN, OUT> TypeInformation<OUT> 
getOutputTypeForOneInputProcessFunction(
             OneInputStreamProcessFunction<IN, OUT> processFunction,
             TypeInformation<IN> inTypeInformation) {
+        if (processFunction instanceof 
EventTimeWrappedOneInputStreamProcessFunction) {
+            processFunction =
+                    ((EventTimeWrappedOneInputStreamProcessFunction) 
processFunction)
+                            .getWrappedUserFunction();
+        }
+
         return TypeExtractor.getUnaryOperatorReturnType(
                 processFunction,
                 OneInputStreamProcessFunction.class,
@@ -101,6 +111,12 @@ public final class StreamUtils {
                     true);
         }
 
+        if (processFunction instanceof 
EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) {
+            processFunction =
+                    
((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) processFunction)
+                            .getWrappedUserFunction();
+        }
+
         return TypeExtractor.getBinaryOperatorReturnType(
                 processFunction,
                 TwoInputNonBroadcastStreamProcessFunction.class,
@@ -123,6 +139,12 @@ public final class StreamUtils {
                     TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> 
processFunction,
                     TypeInformation<IN1> in1TypeInformation,
                     TypeInformation<IN2> in2TypeInformation) {
+        if (processFunction instanceof 
EventTimeWrappedTwoInputBroadcastStreamProcessFunction) {
+            processFunction =
+                    ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction) 
processFunction)
+                            .getWrappedUserFunction();
+        }
+
         return TypeExtractor.getBinaryOperatorReturnType(
                 processFunction,
                 TwoInputBroadcastStreamProcessFunction.class,
@@ -146,6 +168,15 @@ public final class StreamUtils {
                             TwoOutputStreamProcessFunction<IN, OUT1, OUT2>
                                     twoOutputStreamProcessFunction,
                             TypeInformation<IN> inTypeInformation) {
+
+        if (twoOutputStreamProcessFunction
+                instanceof EventTimeWrappedTwoOutputStreamProcessFunction) {
+            twoOutputStreamProcessFunction =
+                    ((EventTimeWrappedTwoOutputStreamProcessFunction)
+                                    twoOutputStreamProcessFunction)
+                            .getWrappedUserFunction();
+        }
+
         TypeInformation<OUT1> firstOutputType =
                 TypeExtractor.getUnaryOperatorReturnType(
                         twoOutputStreamProcessFunction,
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java
new file mode 100644
index 00000000000..a2bb6a78c0f
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java
@@ -0,0 +1,848 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream.extension.eventtime;
+
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
+import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** This ITCase class tests the behavior of {@link EventTimeExtension}. */
+class EventTimeExtensionITCase implements Serializable {
+    private ExecutionEnvironment env;
+    private List<Tuple2<Long, String>> inputRecords =
+            List.of(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), Tuple2.of(3L, 
"c"));
+    private List<Long> inputEventTimes =
+            inputRecords.stream().map(x -> x.f0).collect(Collectors.toList());
+
+    @BeforeEach
+    void before() throws Exception {
+        env = ExecutionEnvironment.getInstance();
+    }
+
+    @AfterEach
+    void after() {
+        // one input
+        TestOneInputStreamProcessFunction.receivedRecords.clear();
+        TestOneInputStreamProcessFunction.receivedEventTimes.clear();
+        TestOneInputEventTimeStreamProcessFunction.receivedRecords.clear();
+        TestOneInputEventTimeStreamProcessFunction.receivedEventTimes.clear();
+        TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes.clear();
+
+        // two output
+        TestTwoOutputStreamProcessFunction.receivedRecords.clear();
+        TestTwoOutputStreamProcessFunction.receivedEventTimes.clear();
+        TestTwoOutputEventTimeStreamProcessFunction.receivedRecords.clear();
+        TestTwoOutputEventTimeStreamProcessFunction.receivedEventTimes.clear();
+        TestTwoOutputEventTimeStreamProcessFunction.invokedTimerTimes.clear();
+
+        // two input broadcast
+        TestTwoInputBroadcastStreamProcessFunction.receivedRecords.clear();
+        TestTwoInputBroadcastStreamProcessFunction.receivedEventTimes.clear();
+        
TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedRecords.clear();
+        
TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedEventTimes.clear();
+
+        // two input non-broadcast
+        TestTwoInputNonBroadcastStreamProcessFunction.receivedRecords.clear();
+        
TestTwoInputNonBroadcastStreamProcessFunction.receivedEventTimes.clear();
+        
TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedRecords.clear();
+        
TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedEventTimes.clear();
+        
TestTwoInputNonBroadcastEventTimeStreamProcessFunction.invokedTimerTimes.clear();
+    }
+
+    @Test
+    void testWatermarkGeneratorGenerateEventTimeWatermark() throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source = 
getSourceWithWatermarkGenerator();
+        source.process(new TestOneInputStreamProcessFunction(true));
+        env.execute("testWatermarkGeneratorGenerateEventTimeWatermark");
+
+        assertThat(TestOneInputStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        assertThat(TestOneInputStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    // ============== test one input =================
+
+    @Test
+    void testOneInputProcessFunctionForwardEventTimeWatermark() throws 
Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source = 
getSourceWithWatermarkGenerator();
+        source.process(new TestOneInputStreamProcessFunction(false))
+                .process(new TestOneInputStreamProcessFunction(true));
+        env.execute("testOneInputProcessFunctionForwardEventTimeWatermark");
+
+        assertThat(TestOneInputStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        assertThat(TestOneInputStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    @Test
+    void 
testOneInputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer()
+            throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source = 
getSourceWithWatermarkGenerator();
+        source.keyBy(x -> x.f0)
+                .process(
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestOneInputEventTimeStreamProcessFunction(true)));
+        env.execute(
+                
"testOneInputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer");
+
+        assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        
assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+        
assertThat(TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes)
+                .containsExactlyElementsOf(
+                        inputEventTimes.stream().map(x -> x + 
1).collect(Collectors.toList()));
+    }
+
+    @Test
+    void testOneInputEventTimeProcessFunctionForwardEventTimeWatermark() 
throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source = 
getSourceWithWatermarkGenerator();
+        source.process(
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestOneInputEventTimeStreamProcessFunction(false)))
+                .keyBy(x -> x.f0)
+                .process(
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestOneInputEventTimeStreamProcessFunction(true)));
+        
env.execute("testOneInputEventTimeProcessFunctionForwardEventTimeWatermark");
+
+        assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        
assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+        
assertThat(TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes)
+                .containsExactlyElementsOf(
+                        inputEventTimes.stream().map(x -> x + 
1).collect(Collectors.toList()));
+    }
+
+    // ============== test two output =================
+
+    @Test
+    void testTwoOutputProcessFunctionForwardEventTimeWatermark() throws 
Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source = 
getSourceWithWatermarkGenerator();
+        
NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<
+                        Tuple2<Long, String>, Tuple2<Long, String>>
+                twoOutputStream = source.process(new 
TestTwoOutputStreamProcessFunction(false));
+        twoOutputStream.getFirst().process(new 
TestOneInputStreamProcessFunction(true));
+        twoOutputStream
+                .getFirst()
+                .keyBy(x -> x.f0)
+                .process(
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestOneInputEventTimeStreamProcessFunction(true)));
+        env.execute("testTwoOutputProcessFunctionForwardEventTimeWatermark");
+
+        assertThat(TestOneInputStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        assertThat(TestOneInputStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+        assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        
assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    @Test
+    void 
testTwoOutputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer()
+            throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source = 
getSourceWithWatermarkGenerator();
+        source.keyBy(x -> x.f0)
+                .process(
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestTwoOutputEventTimeStreamProcessFunction(true)));
+        env.execute(
+                
"testTwoOutputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer");
+
+        assertThat(TestTwoOutputEventTimeStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        
assertThat(TestTwoOutputEventTimeStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+        
assertThat(TestTwoOutputEventTimeStreamProcessFunction.invokedTimerTimes)
+                .containsExactlyElementsOf(
+                        inputEventTimes.stream().map(x -> x + 
1).collect(Collectors.toList()));
+    }
+
+    @Test
+    void testTwoOutputEventTimeProcessFunctionForwardEventTimeWatermark() 
throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source = 
getSourceWithWatermarkGenerator();
+        
NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream<
+                        Tuple2<Long, String>, Tuple2<Long, String>>
+                twoOutputStream =
+                        source.process(
+                                EventTimeExtension.wrapProcessFunction(
+                                        new 
TestTwoOutputEventTimeStreamProcessFunction(false)));
+        twoOutputStream.getFirst().process(new 
TestOneInputStreamProcessFunction(true));
+        twoOutputStream
+                .getFirst()
+                .keyBy(x -> x.f0)
+                .process(
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestOneInputEventTimeStreamProcessFunction(true)));
+        
env.execute("testTwoOutputEventTimeProcessFunctionForwardEventTimeWatermark");
+
+        assertThat(TestOneInputStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        assertThat(TestOneInputStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+        assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        
assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    // ============== test two input broadcast =================
+
+    @Test
+    void testTwoInputBroadcastProcessFunctionForwardEventTimeWatermark() 
throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source1 = 
getSourceWithWatermarkGenerator();
+        NonKeyedPartitionStream<Tuple2<Long, String>> source2 = 
getSourceWithWatermarkGenerator();
+        source1.broadcast()
+                .connectAndProcess(source2, new 
TestTwoInputBroadcastStreamProcessFunction(false))
+                .process(new TestOneInputStreamProcessFunction(true));
+        
env.execute("testTwoInputBroadcastProcessFunctionForwardEventTimeWatermark");
+
+        assertThat(TestOneInputStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        assertThat(TestOneInputStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    @Test
+    void 
testTwoInputBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer()
+            throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source1 = 
getSourceWithWatermarkGenerator();
+        NonKeyedPartitionStream<Tuple2<Long, String>> source2 = 
getSourceWithWatermarkGenerator();
+        source1.broadcast()
+                .connectAndProcess(
+                        source2,
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestTwoInputBroadcastEventTimeStreamProcessFunction(true)));
+        env.execute(
+                
"testTwoInputBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer");
+
+        
assertThat(TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        
assertThat(TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    @Test
+    void 
testTwoInputBroadcastEventTimeProcessFunctionForwardEventTimeWatermark() throws 
Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source1 = 
getSourceWithWatermarkGenerator();
+        NonKeyedPartitionStream<Tuple2<Long, String>> source2 = 
getSourceWithWatermarkGenerator();
+        source1.broadcast()
+                .connectAndProcess(
+                        source2,
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestTwoInputBroadcastEventTimeStreamProcessFunction(false)))
+                .process(new TestOneInputStreamProcessFunction(true));
+        
env.execute("testTwoInputBroadcastEventTimeProcessFunctionForwardEventTimeWatermark");
+
+        assertThat(TestOneInputStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        assertThat(TestOneInputStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    // ============== test two input non-broadcast =================
+
+    @Test
+    void testTwoInputNonBroadcastProcessFunctionForwardEventTimeWatermark() 
throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source1 = 
getSourceWithWatermarkGenerator();
+        NonKeyedPartitionStream<Tuple2<Long, String>> source2 = 
getSourceWithWatermarkGenerator();
+        source1.connectAndProcess(source2, new 
TestTwoInputNonBroadcastStreamProcessFunction(false))
+                .process(new TestOneInputStreamProcessFunction(true));
+        
env.execute("testTwoInputNonBroadcastProcessFunctionForwardEventTimeWatermark");
+
+        assertThat(TestOneInputStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        assertThat(TestOneInputStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    @Test
+    void 
testTwoInputNonBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer()
+            throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source1 = 
getSourceWithWatermarkGenerator();
+        NonKeyedPartitionStream<Tuple2<Long, String>> source2 = 
getSourceWithWatermarkGenerator();
+        source1.keyBy(x -> x.f0)
+                .connectAndProcess(
+                        source2.keyBy(x -> x.f0),
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestTwoInputNonBroadcastEventTimeStreamProcessFunction(true)));
+        env.execute(
+                
"testTwoInputNonBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer");
+
+        
assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        
assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+        
assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.invokedTimerTimes)
+                .containsExactlyElementsOf(
+                        inputEventTimes.stream().map(x -> x + 
1).collect(Collectors.toList()));
+    }
+
+    @Test
+    void 
testTwoInputNonBroadcastEventTimeProcessFunctionForwardEventTimeWatermark()
+            throws Exception {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source1 = 
getSourceWithWatermarkGenerator();
+        NonKeyedPartitionStream<Tuple2<Long, String>> source2 = 
getSourceWithWatermarkGenerator();
+        source1.keyBy(x -> x.f0)
+                .connectAndProcess(
+                        source2.keyBy(x -> x.f0),
+                        EventTimeExtension.wrapProcessFunction(
+                                new 
TestTwoInputNonBroadcastEventTimeStreamProcessFunction(false)))
+                .process(new TestOneInputStreamProcessFunction(true));
+        
env.execute("testTwoInputNonBroadcastEventTimeProcessFunctionForwardEventTimeWatermark");
+
+        assertThat(TestOneInputStreamProcessFunction.receivedRecords)
+                .containsExactlyElementsOf(inputRecords);
+        assertThat(TestOneInputStreamProcessFunction.receivedEventTimes)
+                .containsExactlyElementsOf(inputEventTimes);
+    }
+
+    private static class TestOneInputStreamProcessFunction
+            implements OneInputStreamProcessFunction<Tuple2<Long, String>, 
Tuple2<Long, String>> {
+        public static ConcurrentLinkedQueue<Tuple2<Long, String>> 
receivedRecords =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> receivedEventTimes =
+                new ConcurrentLinkedQueue<>();
+        private boolean needCollectReceivedWatermarkAndRecord;
+
+        public TestOneInputStreamProcessFunction(boolean 
needCollectReceivedWatermarkAndRecord) {
+            this.needCollectReceivedWatermarkAndRecord = 
needCollectReceivedWatermarkAndRecord;
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermark(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            if (EventTimeExtension.isEventTimeWatermark(watermark)
+                    && needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(((LongWatermark) watermark).getValue());
+            }
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public void processRecord(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedRecords.add(record);
+            }
+            output.collect(record);
+        }
+    }
+
+    private static class TestOneInputEventTimeStreamProcessFunction
+            implements OneInputEventTimeStreamProcessFunction<
+                    Tuple2<Long, String>, Tuple2<Long, String>> {
+        public static ConcurrentLinkedQueue<Tuple2<Long, String>> 
receivedRecords =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> receivedEventTimes =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> invokedTimerTimes = new 
ConcurrentLinkedQueue<>();
+        private boolean needCollectReceivedWatermarkAndRecord;
+        private EventTimeManager eventTimeManager;
+
+        public TestOneInputEventTimeStreamProcessFunction(
+                boolean needCollectReceivedWatermarkAndRecord) {
+            this.needCollectReceivedWatermarkAndRecord = 
needCollectReceivedWatermarkAndRecord;
+        }
+
+        @Override
+        public void initEventTimeProcessFunction(EventTimeManager 
eventTimeManager) {
+            this.eventTimeManager = eventTimeManager;
+        }
+
+        @Override
+        public void processRecord(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedRecords.add(record);
+                eventTimeManager.registerTimer(record.f0 + 1);
+            }
+            output.collect(record);
+        }
+
+        @Override
+        public void onEventTimeWatermark(
+                long watermarkTimestamp,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(watermarkTimestamp);
+            }
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermark(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            
assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse();
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public void onEventTimer(
+                long timestamp,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx) {
+            if (needCollectReceivedWatermarkAndRecord) {
+                invokedTimerTimes.add(timestamp);
+            }
+        }
+    }
+
+    private static class TestTwoOutputStreamProcessFunction
+            implements TwoOutputStreamProcessFunction<
+                    Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, 
String>> {
+        public static ConcurrentLinkedQueue<Tuple2<Long, String>> 
receivedRecords =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> receivedEventTimes =
+                new ConcurrentLinkedQueue<>();
+        private boolean needCollectReceivedWatermarkAndRecord;
+
+        public TestTwoOutputStreamProcessFunction(boolean 
needCollectReceivedWatermarkAndRecord) {
+            this.needCollectReceivedWatermarkAndRecord = 
needCollectReceivedWatermarkAndRecord;
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermark(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output1,
+                Collector<Tuple2<Long, String>> output2,
+                TwoOutputNonPartitionedContext<Tuple2<Long, String>, 
Tuple2<Long, String>> ctx) {
+            if (EventTimeExtension.isEventTimeWatermark(watermark)
+                    && needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(((LongWatermark) watermark).getValue());
+            }
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public void processRecord(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output1,
+                Collector<Tuple2<Long, String>> output2,
+                TwoOutputPartitionedContext<Tuple2<Long, String>, Tuple2<Long, 
String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedRecords.add(record);
+            }
+            output1.collect(record);
+            output2.collect(record);
+        }
+    }
+
+    private static class TestTwoOutputEventTimeStreamProcessFunction
+            implements TwoOutputEventTimeStreamProcessFunction<
+                    Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, 
String>> {
+        public static ConcurrentLinkedQueue<Tuple2<Long, String>> 
receivedRecords =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> receivedEventTimes =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> invokedTimerTimes = new 
ConcurrentLinkedQueue<>();
+        private boolean needCollectReceivedWatermarkAndRecord;
+        private EventTimeManager eventTimeManager;
+
+        public TestTwoOutputEventTimeStreamProcessFunction(
+                boolean needCollectReceivedWatermarkAndRecord) {
+            this.needCollectReceivedWatermarkAndRecord = 
needCollectReceivedWatermarkAndRecord;
+        }
+
+        @Override
+        public void initEventTimeProcessFunction(EventTimeManager 
eventTimeManager) {
+            this.eventTimeManager = eventTimeManager;
+        }
+
+        @Override
+        public void processRecord(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output1,
+                Collector<Tuple2<Long, String>> output2,
+                TwoOutputPartitionedContext<Tuple2<Long, String>, Tuple2<Long, 
String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedRecords.add(record);
+                eventTimeManager.registerTimer(record.f0 + 1);
+            }
+            output1.collect(record);
+            output2.collect(record);
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermark(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output1,
+                Collector<Tuple2<Long, String>> output2,
+                TwoOutputNonPartitionedContext<Tuple2<Long, String>, 
Tuple2<Long, String>> ctx) {
+            
assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse();
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public void onEventTimeWatermark(
+                long watermarkTimestamp,
+                Collector<Tuple2<Long, String>> output1,
+                Collector<Tuple2<Long, String>> output2,
+                TwoOutputNonPartitionedContext<Tuple2<Long, String>, 
Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(watermarkTimestamp);
+            }
+        }
+
+        @Override
+        public void onEventTimer(
+                long timestamp,
+                Collector<Tuple2<Long, String>> output1,
+                Collector<Tuple2<Long, String>> output2,
+                TwoOutputPartitionedContext<Tuple2<Long, String>, Tuple2<Long, 
String>> ctx) {
+            if (needCollectReceivedWatermarkAndRecord) {
+                invokedTimerTimes.add(timestamp);
+            }
+        }
+    }
+
+    private static class TestTwoInputBroadcastStreamProcessFunction
+            implements TwoInputBroadcastStreamProcessFunction<
+                    Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, 
String>> {
+        public static ConcurrentLinkedQueue<Tuple2<Long, String>> 
receivedRecords =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> receivedEventTimes =
+                new ConcurrentLinkedQueue<>();
+        private boolean needCollectReceivedWatermarkAndRecord;
+
+        public TestTwoInputBroadcastStreamProcessFunction(
+                boolean needCollectReceivedWatermarkAndRecord) {
+            this.needCollectReceivedWatermarkAndRecord = 
needCollectReceivedWatermarkAndRecord;
+        }
+
+        @Override
+        public void processRecordFromNonBroadcastInput(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedRecords.add(record);
+            }
+            output.collect(record);
+        }
+
+        @Override
+        public void processRecordFromBroadcastInput(
+                Tuple2<Long, String> record, 
NonPartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            // do nothing
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermarkFromBroadcastInput(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            if (EventTimeExtension.isEventTimeWatermark(watermark)
+                    && needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(((LongWatermark) watermark).getValue());
+            }
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermarkFromNonBroadcastInput(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            if (EventTimeExtension.isEventTimeWatermark(watermark)
+                    && needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(((LongWatermark) watermark).getValue());
+            }
+            return WatermarkHandlingResult.PEEK;
+        }
+    }
+
+    private static class TestTwoInputBroadcastEventTimeStreamProcessFunction
+            implements TwoInputBroadcastEventTimeStreamProcessFunction<
+                    Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, 
String>> {
+        public static ConcurrentLinkedQueue<Tuple2<Long, String>> 
receivedRecords =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> receivedEventTimes =
+                new ConcurrentLinkedQueue<>();
+        private boolean needCollectReceivedWatermarkAndRecord;
+        private EventTimeManager eventTimeManager;
+
+        public TestTwoInputBroadcastEventTimeStreamProcessFunction(
+                boolean needCollectReceivedWatermarkAndRecord) {
+            this.needCollectReceivedWatermarkAndRecord = 
needCollectReceivedWatermarkAndRecord;
+        }
+
+        @Override
+        public void initEventTimeProcessFunction(EventTimeManager 
eventTimeManager) {
+            this.eventTimeManager = eventTimeManager;
+        }
+
+        @Override
+        public void processRecordFromNonBroadcastInput(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedRecords.add(record);
+            }
+            output.collect(record);
+        }
+
+        @Override
+        public void processRecordFromBroadcastInput(
+                Tuple2<Long, String> record, 
NonPartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            // do nothing
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermarkFromBroadcastInput(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            
assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse();
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermarkFromNonBroadcastInput(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            
assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse();
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public void onEventTimeWatermark(
+                long watermarkTimestamp,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(watermarkTimestamp);
+            }
+        }
+
+        @Override
+        public void onEventTimer(
+                long timestamp,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx) {
+            throw new UnsupportedOperationException("This function shouldn't 
be invoked.");
+        }
+    }
+
+    private static class TestTwoInputNonBroadcastStreamProcessFunction
+            implements TwoInputNonBroadcastStreamProcessFunction<
+                    Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, 
String>> {
+        public static ConcurrentLinkedQueue<Tuple2<Long, String>> 
receivedRecords =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> receivedEventTimes =
+                new ConcurrentLinkedQueue<>();
+        private boolean needCollectReceivedWatermarkAndRecord;
+
+        public TestTwoInputNonBroadcastStreamProcessFunction(
+                boolean needCollectReceivedWatermarkAndRecord) {
+            this.needCollectReceivedWatermarkAndRecord = 
needCollectReceivedWatermarkAndRecord;
+        }
+
+        @Override
+        public void processRecordFromFirstInput(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedRecords.add(record);
+            }
+            output.collect(record);
+        }
+
+        @Override
+        public void processRecordFromSecondInput(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {}
+
+        @Override
+        public WatermarkHandlingResult onWatermarkFromFirstInput(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            if (EventTimeExtension.isEventTimeWatermark(watermark)
+                    && needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(((LongWatermark) watermark).getValue());
+            }
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermarkFromSecondInput(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            return WatermarkHandlingResult.PEEK;
+        }
+    }
+
+    private static class TestTwoInputNonBroadcastEventTimeStreamProcessFunction
+            implements TwoInputNonBroadcastEventTimeStreamProcessFunction<
+                    Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, 
String>> {
+        public static ConcurrentLinkedQueue<Tuple2<Long, String>> 
receivedRecords =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> receivedEventTimes =
+                new ConcurrentLinkedQueue<>();
+        public static ConcurrentLinkedQueue<Long> invokedTimerTimes = new 
ConcurrentLinkedQueue<>();
+        private boolean needCollectReceivedWatermarkAndRecord;
+        private EventTimeManager eventTimeManager;
+
+        public TestTwoInputNonBroadcastEventTimeStreamProcessFunction(
+                boolean needCollectReceivedWatermarkAndRecord) {
+            this.needCollectReceivedWatermarkAndRecord = 
needCollectReceivedWatermarkAndRecord;
+        }
+
+        @Override
+        public void initEventTimeProcessFunction(EventTimeManager 
eventTimeManager) {
+            this.eventTimeManager = eventTimeManager;
+        }
+
+        @Override
+        public void onEventTimeWatermark(
+                long watermarkTimestamp,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedEventTimes.add(watermarkTimestamp);
+            }
+        }
+
+        @Override
+        public void onEventTimer(
+                long timestamp,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx) {
+            if (needCollectReceivedWatermarkAndRecord) {
+                invokedTimerTimes.add(timestamp);
+            }
+        }
+
+        @Override
+        public void processRecordFromFirstInput(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            if (needCollectReceivedWatermarkAndRecord) {
+                receivedRecords.add(record);
+                eventTimeManager.registerTimer(record.f0 + 1);
+            }
+            output.collect(record);
+        }
+
+        @Override
+        public void processRecordFromSecondInput(
+                Tuple2<Long, String> record,
+                Collector<Tuple2<Long, String>> output,
+                PartitionedContext<Tuple2<Long, String>> ctx)
+                throws Exception {
+            // do nothing
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermarkFromFirstInput(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            
assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse();
+            return WatermarkHandlingResult.PEEK;
+        }
+
+        @Override
+        public WatermarkHandlingResult onWatermarkFromSecondInput(
+                Watermark watermark,
+                Collector<Tuple2<Long, String>> output,
+                NonPartitionedContext<Tuple2<Long, String>> ctx) {
+            
assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse();
+            return WatermarkHandlingResult.PEEK;
+        }
+    }
+
+    private NonKeyedPartitionStream<Tuple2<Long, String>> 
getSourceWithWatermarkGenerator() {
+        NonKeyedPartitionStream<Tuple2<Long, String>> source =
+                env.fromSource(DataStreamV2SourceUtils.fromData(inputRecords), 
"Source")
+                        .withParallelism(1);
+
+        return source.process(
+                EventTimeExtension.<Tuple2<Long, 
String>>newWatermarkGeneratorBuilder(
+                                event -> event.f0)
+                        .perEventWatermark()
+                        .buildAsProcessFunction());
+    }
+}

Reply via email to