This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69e67fa8ca704472545aada9b701e7ca9a0de408 Author: Xu Huang <huangxu.wal...@gmail.com> AuthorDate: Thu Jan 16 13:46:42 2025 +0800 [FLINK-37112][API] Process event time extension related watermarks in EventTimeProcessFunction for DataStream V2 --- .../extension/eventtime/EventTimeExtension.java | 134 ++++ .../function/EventTimeProcessFunction.java | 40 + .../OneInputEventTimeStreamProcessFunction.java | 47 ++ ...putBroadcastEventTimeStreamProcessFunction.java | 47 ++ ...NonBroadcastEventTimeStreamProcessFunction.java | 47 ++ .../TwoOutputEventTimeStreamProcessFunction.java | 54 ++ .../eventtime/timer/EventTimeManager.java | 53 ++ .../eventtime/EventTimeExtensionImpl.java | 38 + ...ntTimeWrappedOneInputStreamProcessFunction.java | 143 ++++ ...ppedTwoInputBroadcastStreamProcessFunction.java | 180 +++++ ...dTwoInputNonBroadcastStreamProcessFunction.java | 179 +++++ ...tTimeWrappedTwoOutputStreamProcessFunction.java | 156 ++++ .../eventtime/timer/DefaultEventTimeManager.java | 72 ++ .../impl/operators/KeyedProcessOperator.java | 17 +- .../KeyedTwoInputBroadcastProcessOperator.java | 17 +- .../KeyedTwoInputNonBroadcastProcessOperator.java | 18 +- .../operators/KeyedTwoOutputProcessOperator.java | 21 +- .../datastream/impl/operators/ProcessOperator.java | 18 + .../TwoInputBroadcastProcessOperator.java | 20 + .../TwoInputNonBroadcastProcessOperator.java | 25 + .../impl/operators/TwoOutputProcessOperator.java | 19 + .../impl/stream/BroadcastStreamImpl.java | 15 + .../flink/datastream/impl/utils/StreamUtils.java | 31 + .../eventtime/EventTimeExtensionITCase.java | 848 +++++++++++++++++++++ 24 files changed, 2235 insertions(+), 4 deletions(-) diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java index 9417baef1d5..a0ac5f8c499 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java @@ -23,9 +23,20 @@ import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration; import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; import org.apache.flink.api.common.watermark.Watermark; import org.apache.flink.api.common.watermark.WatermarkDeclarations; +import org.apache.flink.datastream.api.extension.eventtime.function.EventTimeProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction; import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor; import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.datastream.api.function.ProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; /** * The entry point for the event-time extension, which provides the following functionality: @@ -47,6 +58,16 @@ import org.apache.flink.datastream.api.function.ProcessFunction; * source.process(watermarkGeneratorProcessFunction) * .process(...) * }</pre> + * <li>provides a tool to encapsulate a user-defined {@link EventTimeProcessFunction} to provide + * the relevant components of the event-time extension. + * <pre>{@code + * stream.process( + * EventTimeExtension.wrapProcessFunction( + * new CustomEventTimeProcessFunction() + * ) + * ) + * .process(...) + * }</pre> * </ul> */ @Experimental @@ -136,4 +157,117 @@ public class EventTimeExtension { EventTimeExtractor<T> eventTimeExtractor) { return new EventTimeWatermarkGeneratorBuilder<>(eventTimeExtractor); } + + // ======== Wrap user-defined {@link EventTimeProcessFunction} ========= + + /** + * Wrap the user-defined {@link OneInputEventTimeStreamProcessFunction}, which will provide + * related components such as {@link EventTimeManager} and declare the necessary built-in state + * required for the Timer, etc. Note that registering event timers of {@link + * EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}. + * + * @param processFunction The user-defined {@link OneInputEventTimeStreamProcessFunction} that + * needs to be wrapped. + * @return The wrapped {@link OneInputStreamProcessFunction}. + */ + public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> wrapProcessFunction( + OneInputEventTimeStreamProcessFunction<IN, OUT> processFunction) { + try { + return (OneInputStreamProcessFunction<IN, OUT>) + getEventTimeExtensionImplClass() + .getMethod( + "wrapProcessFunction", + OneInputEventTimeStreamProcessFunction.class) + .invoke(null, processFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the user-defined {@link TwoOutputStreamProcessFunction}, which will provide related + * components such as {@link EventTimeManager} and declare the necessary built-in state required + * for the Timer, etc. Note that registering event timers of {@link EventTimeProcessFunction} + * can only be used with {@link KeyedPartitionStream}. + * + * @param processFunction The user-defined {@link TwoOutputEventTimeStreamProcessFunction} that + * needs to be wrapped. + * @return The wrapped {@link TwoOutputStreamProcessFunction}. + */ + public static <IN, OUT1, OUT2> + TwoOutputStreamProcessFunction<IN, OUT1, OUT2> wrapProcessFunction( + TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> processFunction) { + try { + return (TwoOutputStreamProcessFunction<IN, OUT1, OUT2>) + getEventTimeExtensionImplClass() + .getMethod( + "wrapProcessFunction", + TwoOutputEventTimeStreamProcessFunction.class) + .invoke(null, processFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the user-defined {@link TwoInputNonBroadcastEventTimeStreamProcessFunction}, which will + * provide related components such as {@link EventTimeManager} and declare the necessary + * built-in state required for the Timer, etc. Note that registering event timers of {@link + * EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}. + * + * @param processFunction The user-defined {@link + * TwoInputNonBroadcastEventTimeStreamProcessFunction} that needs to be wrapped. + * @return The wrapped {@link TwoInputNonBroadcastStreamProcessFunction}. + */ + public static <IN1, IN2, OUT> + TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> wrapProcessFunction( + TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> + processFunction) { + try { + return (TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>) + getEventTimeExtensionImplClass() + .getMethod( + "wrapProcessFunction", + TwoInputNonBroadcastEventTimeStreamProcessFunction.class) + .invoke(null, processFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the user-defined {@link TwoInputBroadcastEventTimeStreamProcessFunction}, which will + * provide related components such as {@link EventTimeManager} and declare the necessary + * built-in state required for the Timer, etc. Note that registering event timers of {@link + * EventTimeProcessFunction} can only be used with {@link KeyedPartitionStream}. + * + * @param processFunction The user-defined {@link + * TwoInputBroadcastEventTimeStreamProcessFunction} that needs to be wrapped. + * @return The wrapped {@link TwoInputBroadcastStreamProcessFunction}. + */ + public static <IN1, IN2, OUT> + TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> wrapProcessFunction( + TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> + processFunction) { + try { + return (TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT>) + getEventTimeExtensionImplClass() + .getMethod( + "wrapProcessFunction", + TwoInputBroadcastEventTimeStreamProcessFunction.class) + .invoke(null, processFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Get the implementation class of EventTimeExtension. */ + private static Class<?> getEventTimeExtensionImplClass() { + try { + return Class.forName( + "org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Please ensure that flink-datastream in your class path"); + } + } } diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java new file mode 100644 index 00000000000..705cedcf321 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.ProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** + * The base interface for event time processing, indicating that the {@link ProcessFunction} will be + * enriched with event time processing functions, such as registering event timers and handle event + * time watermarks. + * + * <p>Note that registering event timers can only be used with {@link KeyedPartitionStream}. + */ +@Experimental +public interface EventTimeProcessFunction extends ProcessFunction { + /** + * Initialize the {@link EventTimeProcessFunction} with an instance of {@link EventTimeManager}. + * Note that this method should be invoked before the open method. + */ + void initEventTimeProcessFunction(EventTimeManager eventTimeManager); +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java new file mode 100644 index 00000000000..86da2a867d0 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** The {@link OneInputStreamProcessFunction} that extends with event time support. */ +@Experimental +public interface OneInputEventTimeStreamProcessFunction<IN, OUT> + extends EventTimeProcessFunction, OneInputStreamProcessFunction<IN, OUT> { + + /** + * The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has + * received an EventTimeWatermark. Other types of watermarks will be processed by the {@code + * ProcessFunction#onWatermark} method. + */ + default void onEventTimeWatermark( + long watermarkTimestamp, Collector<OUT> output, NonPartitionedContext<OUT> ctx) + throws Exception {} + + /** + * Invoked when an event-time timer fires. Note that it is only used in {@link + * KeyedPartitionStream}. + */ + default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java new file mode 100644 index 00000000000..efa62f8a56f --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputBroadcastEventTimeStreamProcessFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** The {@link TwoInputBroadcastStreamProcessFunction} that extends with event time support. */ +@Experimental +public interface TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> + extends EventTimeProcessFunction, TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> { + + /** + * The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has + * received an EventTimeWatermark. Other types of watermarks will be processed by the {@code + * ProcessFunction#onWatermark} method. + */ + default void onEventTimeWatermark( + long watermarkTimestamp, Collector<OUT> output, NonPartitionedContext<OUT> ctx) + throws Exception {} + + /** + * Invoked when an event-time timer fires. Note that it is only used in {@link + * KeyedPartitionStream}. + */ + default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java new file mode 100644 index 00000000000..1502825f781 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoInputNonBroadcastEventTimeStreamProcessFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** The {@link TwoInputNonBroadcastStreamProcessFunction} that extends with event time support. */ +@Experimental +public interface TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> + extends EventTimeProcessFunction, TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> { + + /** + * The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has + * received an EventTimeWatermark. Other types of watermarks will be processed by the {@code + * ProcessFunction#onWatermark} method. + */ + default void onEventTimeWatermark( + long watermarkTimestamp, Collector<OUT> output, NonPartitionedContext<OUT> ctx) + throws Exception {} + + /** + * Invoked when an event-time timer fires. Note that it is only used in {@link + * KeyedPartitionStream}. + */ + default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java new file mode 100644 index 00000000000..56e33db636f --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/TwoOutputEventTimeStreamProcessFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; +import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** The {@link TwoOutputStreamProcessFunction} that extends with event time support. */ +@Experimental +public interface TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> + extends EventTimeProcessFunction, TwoOutputStreamProcessFunction<IN, OUT1, OUT2> { + + /** + * The {@code #onEventTimeWatermark} method signifies that the EventTimeProcessFunction has + * received an EventTimeWatermark. Other types of watermarks will be processed by the {@code + * ProcessFunction#onWatermark} method. + */ + default void onEventTimeWatermark( + long watermarkTimestamp, + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) + throws Exception {} + + /** + * Invoked when an event-time timer fires. Note that it is only used in {@link + * KeyedPartitionStream}. + */ + default void onEventTimer( + long timestamp, + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputPartitionedContext<OUT1, OUT2> ctx) {} +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java new file mode 100644 index 00000000000..c5df8e26dea --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.eventtime.timer; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; + +/** + * This class is responsible for managing stuff related to event-time/timer. For example, register + * and delete event timers, as well as retrieve event time. Note that methods for timer can only be + * used in {@link KeyedPartitionStream}. + */ +@Experimental +public interface EventTimeManager { + /** + * Register an event timer for this process function. The {@code onEventTimer} method will be + * invoked when the event time is reached. + * + * @param timestamp to trigger timer callback. + */ + void registerTimer(long timestamp); + + /** + * Deletes the event-time timer with the given trigger timestamp. This method has only an effect + * if such a timer was previously registered and did not already expire. + * + * @param timestamp indicates the timestamp of the timer to delete. + */ + void deleteTimer(long timestamp); + + /** + * Get the current event time. + * + * @return current event time. + */ + long currentTime(); +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java index a4a26bc9490..f01972559d6 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java @@ -19,8 +19,19 @@ package org.apache.flink.datastream.impl.extension.eventtime; import org.apache.flink.api.common.watermark.Watermark; import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction; import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy; import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction; import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction; /** The implementation of {@link EventTimeExtension}. */ @@ -37,6 +48,33 @@ public class EventTimeExtensionImpl { return new ExtractEventTimeProcessFunction<>(strategy); } + // ============= Wrap Event Time Process Function ============= + + public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> wrapProcessFunction( + OneInputEventTimeStreamProcessFunction<IN, OUT> processFunction) { + return new EventTimeWrappedOneInputStreamProcessFunction<>(processFunction); + } + + public static <IN, OUT1, OUT2> + TwoOutputStreamProcessFunction<IN, OUT1, OUT2> wrapProcessFunction( + TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> processFunction) { + return new EventTimeWrappedTwoOutputStreamProcessFunction<>(processFunction); + } + + public static <IN1, IN2, OUT> + TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> wrapProcessFunction( + TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> + processFunction) { + return new EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<>(processFunction); + } + + public static <IN1, IN2, OUT> + TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> wrapProcessFunction( + TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> + processFunction) { + return new EventTimeWrappedTwoInputBroadcastStreamProcessFunction<>(processFunction); + } + // ============= Other Utils ============= public static boolean isEventTimeExtensionWatermark(Watermark watermark) { return EventTimeExtension.isEventTimeWatermark(watermark) diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java new file mode 100644 index 00000000000..b1867af7a39 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedOneInputStreamProcessFunction.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.function.Supplier; + +/** + * The wrapped {@link OneInputEventTimeStreamProcessFunction} that take care of event-time alignment + * with idleness. + */ +public class EventTimeWrappedOneInputStreamProcessFunction<IN, OUT> + implements OneInputStreamProcessFunction<IN, OUT> { + + private final OneInputEventTimeStreamProcessFunction<IN, OUT> wrappedUserFunction; + + private transient EventTimeManager eventTimeManager; + + private transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + + public EventTimeWrappedOneInputStreamProcessFunction( + OneInputEventTimeStreamProcessFunction<IN, OUT> wrappedUserFunction) { + this.wrappedUserFunction = Preconditions.checkNotNull(wrappedUserFunction); + } + + @Override + public void open(NonPartitionedContext<OUT> ctx) throws Exception { + wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager); + wrappedUserFunction.open(ctx); + } + + /** + * Initialize the event time extension, note that this method should be invoked before open + * method. + */ + public void initEventTimeExtension( + @Nullable InternalTimerService<VoidNamespace> timerService, + Supplier<Long> eventTimeSupplier, + EventTimeWatermarkHandler eventTimeWatermarkHandler) { + this.eventTimeManager = new DefaultEventTimeManager(timerService, eventTimeSupplier); + this.eventTimeWatermarkHandler = eventTimeWatermarkHandler; + } + + @Override + public void processRecord(IN record, Collector<OUT> output, PartitionedContext<OUT> ctx) + throws Exception { + wrappedUserFunction.processRecord(record, output, ctx); + } + + @Override + public void endInput(NonPartitionedContext<OUT> ctx) { + wrappedUserFunction.endInput(ctx); + } + + @Override + public void onProcessingTimer( + long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) { + wrappedUserFunction.onProcessingTimer(timestamp, output, ctx); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 0); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermark(watermark, output, ctx); + } + } + + public void onEventTime(long timestamp, Collector<OUT> output, PartitionedContext ctx) { + wrappedUserFunction.onEventTimer(timestamp, output, ctx); + } + + @Override + public Set<StateDeclaration> usesStates() { + return wrappedUserFunction.usesStates(); + } + + @Override + public Set<? extends WatermarkDeclaration> declareWatermarks() { + return wrappedUserFunction.declareWatermarks(); + } + + @Override + public void close() throws Exception { + wrappedUserFunction.close(); + } + + public OneInputStreamProcessFunction<IN, OUT> getWrappedUserFunction() { + return wrappedUserFunction; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java new file mode 100644 index 00000000000..db3c256482b --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputBroadcastStreamProcessFunction.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.function.Supplier; + +/** + * The wrapped {@link TwoInputBroadcastEventTimeStreamProcessFunction} that take care of event-time + * alignment with idleness. + */ +public class EventTimeWrappedTwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> + implements TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> { + + private final TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> + wrappedUserFunction; + + private transient EventTimeManager eventTimeManager; + + private transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + + public EventTimeWrappedTwoInputBroadcastStreamProcessFunction( + TwoInputBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> wrappedUserFunction) { + this.wrappedUserFunction = Preconditions.checkNotNull(wrappedUserFunction); + } + + @Override + public void open(NonPartitionedContext<OUT> ctx) throws Exception { + wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager); + wrappedUserFunction.open(ctx); + } + + /** + * Initialize the event time extension, note that this method should be invoked before open + * method. + */ + public void initEventTimeExtension( + @Nullable InternalTimerService<VoidNamespace> timerService, + Supplier<Long> eventTimeSupplier, + EventTimeWatermarkHandler eventTimeWatermarkHandler) { + this.eventTimeManager = new DefaultEventTimeManager(timerService, eventTimeSupplier); + this.eventTimeWatermarkHandler = eventTimeWatermarkHandler; + } + + @Override + public void processRecordFromNonBroadcastInput( + IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception { + wrappedUserFunction.processRecordFromNonBroadcastInput(record, output, ctx); + } + + @Override + public void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) + throws Exception { + wrappedUserFunction.processRecordFromBroadcastInput(record, ctx); + } + + @Override + public void endBroadcastInput(NonPartitionedContext<OUT> ctx) { + wrappedUserFunction.endBroadcastInput(ctx); + } + + @Override + public void endNonBroadcastInput(NonPartitionedContext<OUT> ctx) { + wrappedUserFunction.endNonBroadcastInput(ctx); + } + + @Override + public void onProcessingTimer( + long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) { + wrappedUserFunction.onProcessingTimer(timestamp, output, ctx); + } + + @Override + public WatermarkHandlingResult onWatermarkFromBroadcastInput( + Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 0); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermarkFromBroadcastInput(watermark, output, ctx); + } + } + + @Override + public WatermarkHandlingResult onWatermarkFromNonBroadcastInput( + Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 1); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermarkFromBroadcastInput(watermark, output, ctx); + } + } + + public void onEventTime(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) { + wrappedUserFunction.onEventTimer(timestamp, output, ctx); + } + + @Override + public void close() throws Exception { + wrappedUserFunction.close(); + } + + @Override + public Set<StateDeclaration> usesStates() { + return wrappedUserFunction.usesStates(); + } + + @Override + public Set<? extends WatermarkDeclaration> declareWatermarks() { + return wrappedUserFunction.declareWatermarks(); + } + + public TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> getWrappedUserFunction() { + return wrappedUserFunction; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java new file mode 100644 index 00000000000..a8dc809a90e --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.function.Supplier; + +/** + * The wrapped {@link TwoInputNonBroadcastEventTimeStreamProcessFunction} that take care of + * event-time alignment with idleness. + */ +public class EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> + implements TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> { + + private final TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> + wrappedUserFunction; + + private transient EventTimeManager eventTimeManager; + + private transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + + public EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction( + TwoInputNonBroadcastEventTimeStreamProcessFunction<IN1, IN2, OUT> wrappedUserFunction) { + this.wrappedUserFunction = Preconditions.checkNotNull(wrappedUserFunction); + } + + @Override + public void open(NonPartitionedContext<OUT> ctx) throws Exception { + wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager); + wrappedUserFunction.open(ctx); + } + + /** + * Initialize the event time extension, note that this method should be invoked before open + * method. + */ + public void initEventTimeExtension( + @Nullable InternalTimerService<VoidNamespace> timerService, + Supplier<Long> eventTimeSupplier, + EventTimeWatermarkHandler eventTimeWatermarkHandler) { + this.eventTimeManager = new DefaultEventTimeManager(timerService, eventTimeSupplier); + this.eventTimeWatermarkHandler = eventTimeWatermarkHandler; + } + + @Override + public void processRecordFromFirstInput( + IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception { + wrappedUserFunction.processRecordFromFirstInput(record, output, ctx); + } + + @Override + public void processRecordFromSecondInput( + IN2 record, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception { + wrappedUserFunction.processRecordFromSecondInput(record, output, ctx); + } + + @Override + public void endFirstInput(NonPartitionedContext<OUT> ctx) { + wrappedUserFunction.endFirstInput(ctx); + } + + @Override + public void endSecondInput(NonPartitionedContext<OUT> ctx) { + wrappedUserFunction.endSecondInput(ctx); + } + + @Override + public void onProcessingTimer( + long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) { + wrappedUserFunction.onProcessingTimer(timestamp, output, ctx); + } + + @Override + public WatermarkHandlingResult onWatermarkFromFirstInput( + Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 0); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermarkFromFirstInput(watermark, output, ctx); + } + } + + @Override + public WatermarkHandlingResult onWatermarkFromSecondInput( + Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 1); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermarkFromSecondInput(watermark, output, ctx); + } + } + + public void onEventTime(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) { + wrappedUserFunction.onEventTimer(timestamp, output, ctx); + } + + @Override + public void close() throws Exception { + wrappedUserFunction.close(); + } + + @Override + public Set<StateDeclaration> usesStates() { + return wrappedUserFunction.usesStates(); + } + + @Override + public Set<? extends WatermarkDeclaration> declareWatermarks() { + return wrappedUserFunction.declareWatermarks(); + } + + public TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> getWrappedUserFunction() { + return wrappedUserFunction; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java new file mode 100644 index 00000000000..5a272f31588 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/EventTimeWrappedTwoOutputStreamProcessFunction.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.functions; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; +import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Set; +import java.util.function.Supplier; + +/** + * The wrapped {@link TwoOutputEventTimeStreamProcessFunction} that take care of event-time + * alignment with idleness. + */ +public class EventTimeWrappedTwoOutputStreamProcessFunction<IN, OUT1, OUT2> + implements TwoOutputStreamProcessFunction<IN, OUT1, OUT2> { + + private final TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> wrappedUserFunction; + + private transient EventTimeManager eventTimeManager; + + private transient EventTimeWatermarkHandler eventTimeWatermarkHandler; + + public EventTimeWrappedTwoOutputStreamProcessFunction( + TwoOutputEventTimeStreamProcessFunction<IN, OUT1, OUT2> wrappedUserFunction) { + this.wrappedUserFunction = Preconditions.checkNotNull(wrappedUserFunction); + } + + @Override + public void open(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) throws Exception { + wrappedUserFunction.initEventTimeProcessFunction(eventTimeManager); + wrappedUserFunction.open(ctx); + } + + /** + * Initialize the event time extension, note that this method should be invoked before open + * method. + */ + public void initEventTimeExtension( + @Nullable InternalTimerService<VoidNamespace> timerService, + Supplier<Long> eventTimeSupplier, + EventTimeWatermarkHandler eventTimeWatermarkHandler) { + this.eventTimeManager = new DefaultEventTimeManager(timerService, eventTimeSupplier); + this.eventTimeWatermarkHandler = eventTimeWatermarkHandler; + } + + @Override + public void processRecord( + IN record, + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputPartitionedContext<OUT1, OUT2> ctx) + throws Exception { + wrappedUserFunction.processRecord(record, output1, output2, ctx); + } + + @Override + public void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) { + wrappedUserFunction.endInput(ctx); + } + + @Override + public void onProcessingTimer( + long timestamp, + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputPartitionedContext<OUT1, OUT2> ctx) { + wrappedUserFunction.onProcessingTimer(timestamp, output1, output2, ctx); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) { + if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermark)) { + // If the watermark is from the event time extension, process it and call {@code + // userFunction#onEventTimeWatermark} when the event time is updated; otherwise, forward + // the watermark. + try { + EventTimeWatermarkHandler.EventTimeUpdateStatus eventTimeUpdateStatus = + eventTimeWatermarkHandler.processWatermark(watermark, 0); + if (eventTimeUpdateStatus.isEventTimeUpdated()) { + wrappedUserFunction.onEventTimeWatermark( + eventTimeUpdateStatus.getNewEventTime(), output1, output2, ctx); + } + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + // return POLL to indicate that the watermark has been processed + return WatermarkHandlingResult.POLL; + } else { + return wrappedUserFunction.onWatermark(watermark, output1, output2, ctx); + } + } + + public void onEventTime( + long timestamp, + Collector<OUT1> output1, + Collector<OUT2> output2, + TwoOutputPartitionedContext<OUT1, OUT2> ctx) { + wrappedUserFunction.onEventTimer(timestamp, output1, output2, ctx); + } + + @Override + public Set<StateDeclaration> usesStates() { + return wrappedUserFunction.usesStates(); + } + + @Override + public Set<? extends WatermarkDeclaration> declareWatermarks() { + return wrappedUserFunction.declareWatermarks(); + } + + @Override + public void close() throws Exception { + wrappedUserFunction.close(); + } + + public TwoOutputStreamProcessFunction<IN, OUT1, OUT2> getWrappedUserFunction() { + return wrappedUserFunction; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java new file mode 100644 index 00000000000..8beb78a7cc9 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/timer/DefaultEventTimeManager.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.eventtime.timer; + +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimerService; + +import javax.annotation.Nullable; + +import java.util.function.Supplier; + +/** The implementation of {@link EventTimeManager}. */ +public class DefaultEventTimeManager implements EventTimeManager { + + /** + * The timer service of operator, used in register event timer. Note that it cloud be null if + * the operator is not a keyed operator. + */ + @Nullable private final InternalTimerService<VoidNamespace> timerService; + + /** The supplier of the current event time. */ + private final Supplier<Long> eventTimeSupplier; + + public DefaultEventTimeManager( + @Nullable InternalTimerService<VoidNamespace> timerService, + Supplier<Long> eventTimeSupplier) { + this.timerService = timerService; + this.eventTimeSupplier = eventTimeSupplier; + } + + @Override + public void registerTimer(long timestamp) { + if (timerService == null) { + throw new UnsupportedOperationException( + "Registering event timer is not allowed in NonKeyed Stream."); + } + + timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp); + } + + @Override + public void deleteTimer(long timestamp) { + if (timerService == null) { + throw new UnsupportedOperationException( + "Deleting event timer is not allowed in NonKeyed Stream."); + } + + timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, timestamp); + } + + @Override + public long currentTime() { + return eventTimeSupplier.get(); + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java index 069b4991f44..23bae4b9921 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -39,6 +40,7 @@ import javax.annotation.Nullable; import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; /** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */ public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT> @@ -84,7 +86,10 @@ public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT> @Override public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception { - // do nothing at the moment. + if (userFunction instanceof EventTimeWrappedOneInputStreamProcessFunction) { + ((EventTimeWrappedOneInputStreamProcessFunction<IN, OUT>) userFunction) + .onEventTime(timer.getTimestamp(), getOutputCollector(), partitionedContext); + } } @Override @@ -121,4 +126,14 @@ public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT> public boolean isAsyncStateProcessingEnabled() { return true; } + + @Override + protected InternalTimerService<VoidNamespace> getTimerService() { + return timerService; + } + + @Override + protected Supplier<Long> getEventTimeSupplier() { + return () -> timerService.currentWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java index 53cb2dea7d6..bb985643e76 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -39,6 +40,7 @@ import javax.annotation.Nullable; import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; /** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. */ public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT> @@ -90,7 +92,10 @@ public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT> @Override public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception { - // do nothing at the moment. + if (userFunction instanceof EventTimeWrappedTwoInputBroadcastStreamProcessFunction) { + ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT>) userFunction) + .onEventTime(timer.getTimestamp(), getOutputCollector(), partitionedContext); + } } @Override @@ -123,4 +128,14 @@ public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT> public boolean isAsyncStateProcessingEnabled() { return true; } + + @Override + protected InternalTimerService<VoidNamespace> getTimerService() { + return timerService; + } + + @Override + protected Supplier<Long> getEventTimeSupplier() { + return () -> timerService.currentWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java index 2644dc11b6a..f6e0f8b71ec 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -39,6 +40,7 @@ import javax.annotation.Nullable; import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; /** * Operator for {@link TwoInputNonBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. @@ -92,7 +94,11 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, IN1, IN2, OUT> @Override public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception { - // do nothing at the moment. + if (userFunction instanceof EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) { + ((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>) + userFunction) + .onEventTime(timer.getTimestamp(), getOutputCollector(), partitionedContext); + } } @Override @@ -131,4 +137,14 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, IN1, IN2, OUT> public boolean isAsyncStateProcessingEnabled() { return true; } + + @Override + protected InternalTimerService<VoidNamespace> getTimerService() { + return timerService; + } + + @Override + protected Supplier<Long> getEventTimeSupplier() { + return () -> timerService.currentWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java index aefc1b1b60d..db9dfc2cb42 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java @@ -27,6 +27,7 @@ import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; import org.apache.flink.datastream.impl.context.DefaultProcessingTimeManager; import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.InternalTimer; @@ -40,6 +41,7 @@ import javax.annotation.Nullable; import java.util.HashSet; import java.util.Set; +import java.util.function.Supplier; /** */ public class KeyedTwoOutputProcessOperator<KEY, IN, OUT_MAIN, OUT_SIDE> @@ -113,7 +115,14 @@ public class KeyedTwoOutputProcessOperator<KEY, IN, OUT_MAIN, OUT_SIDE> @Override public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception { - // do nothing at the moment. + if (userFunction instanceof EventTimeWrappedTwoOutputStreamProcessFunction) { + ((EventTimeWrappedTwoOutputStreamProcessFunction<IN, OUT_MAIN, OUT_SIDE>) userFunction) + .onEventTime( + timer.getTimestamp(), + getMainCollector(), + getSideCollector(), + partitionedContext); + } } @Override @@ -146,4 +155,14 @@ public class KeyedTwoOutputProcessOperator<KEY, IN, OUT_MAIN, OUT_SIDE> public boolean isAsyncStateProcessingEnabled() { return true; } + + @Override + protected InternalTimerService<VoidNamespace> getTimerService() { + return timerService; + } + + @Override + protected Supplier<Long> getEventTimeSupplier() { + return () -> timerService.currentWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java index 1782e043414..efb95e1ebcc 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java @@ -31,10 +31,13 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction; import org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -44,6 +47,7 @@ import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTim import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; /** Operator for {@link OneInputStreamProcessFunction}. */ @@ -114,6 +118,12 @@ public class ProcessOperator<IN, OUT> getExecutionConfig(), partitionedContext.getNonPartitionedContext().getWatermarkManager(), getProcessingTimeService()); + } else if (userFunction instanceof EventTimeWrappedOneInputStreamProcessFunction) { + // note that the {@code initEventTimeExtension} in EventTimeWrappedProcessFunction + // should be invoked before the {@code open}. + ((EventTimeWrappedOneInputStreamProcessFunction<IN, OUT>) userFunction) + .initEventTimeExtension( + getTimerService(), getEventTimeSupplier(), eventTimeWatermarkHandler); } userFunction.open(nonPartitionedContext); @@ -201,4 +211,12 @@ public class ProcessOperator<IN, OUT> // For non-keyed operators, we disable async state processing. return false; } + + protected InternalTimerService<VoidNamespace> getTimerService() { + return null; + } + + protected Supplier<Long> getEventTimeSupplier() { + return () -> eventTimeWatermarkHandler.getLastEmitWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java index 926c133f0fe..0a661fdf1e2 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java @@ -31,9 +31,12 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -43,6 +46,7 @@ import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTim import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; @@ -108,6 +112,14 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT> this.eventTimeWatermarkHandler = new EventTimeWatermarkHandler(2, output, timeServiceManager); + if (userFunction instanceof EventTimeWrappedTwoInputBroadcastStreamProcessFunction) { + // note that the {@code initEventTimeExtension} in EventTimeWrappedProcessFunction + // should be invoked before the {@code open}. + ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT>) userFunction) + .initEventTimeExtension( + getTimerService(), getEventTimeSupplier(), eventTimeWatermarkHandler); + } + this.userFunction.open(this.nonPartitionedContext); } @@ -224,4 +236,12 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT> // For non-keyed operators, we disable async state processing. return false; } + + protected InternalTimerService<VoidNamespace> getTimerService() { + return null; + } + + protected Supplier<Long> getEventTimeSupplier() { + return () -> eventTimeWatermarkHandler.getLastEmitWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java index 3d699dbbf2d..05c207a082c 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.watermark.WatermarkHandlingResult; import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy; import org.apache.flink.datastream.api.context.NonPartitionedContext; import org.apache.flink.datastream.api.context.ProcessingTimeManager; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; import org.apache.flink.datastream.impl.common.OutputCollector; import org.apache.flink.datastream.impl.common.TimestampCollector; @@ -31,10 +32,14 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.timer.DefaultEventTimeManager; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -44,6 +49,7 @@ import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTim import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; @@ -111,6 +117,17 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, OUT> this.eventTimeWatermarkHandler = new EventTimeWatermarkHandler(2, output, timeServiceManager); + if (userFunction instanceof EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) { + // note that the {@code initEventTimeExtension} in EventTimeWrappedProcessFunction + // should be invoked before the {@code open}. + EventTimeManager eventTimeManager = + new DefaultEventTimeManager(getTimerService(), getEventTimeSupplier()); + ((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>) + userFunction) + .initEventTimeExtension( + getTimerService(), getEventTimeSupplier(), eventTimeWatermarkHandler); + } + this.userFunction.open(this.nonPartitionedContext); } @@ -227,4 +244,12 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, OUT> // For non-keyed operators, we disable async state processing. return false; } + + protected InternalTimerService<VoidNamespace> getTimerService() { + return null; + } + + protected Supplier<Long> getEventTimeSupplier() { + return () -> eventTimeWatermarkHandler.getLastEmitWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java index 91d0c0f074f..cdf36e55ef6 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java @@ -32,9 +32,12 @@ import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedCo import org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction; import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.event.WatermarkEvent; +import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -46,6 +49,7 @@ import org.apache.flink.util.OutputTag; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -119,6 +123,13 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE> this.partitionedContext.setNonPartitionedContext(nonPartitionedContext); this.eventTimeWatermarkHandler = new EventTimeWatermarkHandler(1, output, timeServiceManager); + if (userFunction instanceof EventTimeWrappedTwoOutputStreamProcessFunction) { + // note that the {@code initEventTimeExtension} in EventTimeWrappedProcessFunction + // should be invoked before the {@code open}. + ((EventTimeWrappedTwoOutputStreamProcessFunction<IN, OUT_MAIN, OUT_SIDE>) userFunction) + .initEventTimeExtension( + getTimerService(), getEventTimeSupplier(), eventTimeWatermarkHandler); + } this.userFunction.open(this.nonPartitionedContext); } @@ -236,4 +247,12 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE> // For non-keyed operators, we disable async state processing. return false; } + + protected InternalTimerService<VoidNamespace> getTimerService() { + return null; + } + + protected Supplier<Long> getEventTimeSupplier() { + return () -> eventTimeWatermarkHandler.getLastEmitWatermark(); + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java index 8b02eabdaf3..724dd7b1a64 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/BroadcastStreamImpl.java @@ -63,6 +63,11 @@ public class BroadcastStreamImpl<T> extends AbstractDataStream<T> implements Bro // no state redistribution mode check is required here, since all redistribution modes are // acceptable + other = + other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) other) + .getKeyedPartitionStream() + : other; TypeInformation<OUT> outTypeInfo = StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction( processFunction, @@ -92,6 +97,11 @@ public class BroadcastStreamImpl<T> extends AbstractDataStream<T> implements Bro processFunction.usesStates(), new HashSet<>(Collections.singletonList(StateDeclaration.RedistributionMode.NONE))); + other = + other instanceof ProcessConfigurableAndNonKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndNonKeyedPartitionStreamImpl) other) + .getNonKeyedPartitionStream() + : other; TypeInformation<OUT> outTypeInfo = StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction( processFunction, @@ -118,6 +128,11 @@ public class BroadcastStreamImpl<T> extends AbstractDataStream<T> implements Bro KeyedPartitionStream<K, T_OTHER> other, TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> processFunction, KeySelector<OUT, K> newKeySelector) { + other = + other instanceof ProcessConfigurableAndKeyedPartitionStreamImpl + ? ((ProcessConfigurableAndKeyedPartitionStreamImpl) other) + .getKeyedPartitionStream() + : other; TypeInformation<OUT> outTypeInfo = StreamUtils.getOutputTypeForTwoInputBroadcastProcessFunction( processFunction, diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java index 6ec420a8135..3f73608dcf9 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java @@ -34,6 +34,10 @@ import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.datastream.api.stream.GlobalStream.ProcessConfigurableAndGlobalStream; import org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedOneInputStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction; import org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction; import org.apache.flink.datastream.impl.stream.AbstractDataStream; import org.apache.flink.datastream.impl.stream.GlobalStreamImpl; @@ -66,6 +70,12 @@ public final class StreamUtils { public static <IN, OUT> TypeInformation<OUT> getOutputTypeForOneInputProcessFunction( OneInputStreamProcessFunction<IN, OUT> processFunction, TypeInformation<IN> inTypeInformation) { + if (processFunction instanceof EventTimeWrappedOneInputStreamProcessFunction) { + processFunction = + ((EventTimeWrappedOneInputStreamProcessFunction) processFunction) + .getWrappedUserFunction(); + } + return TypeExtractor.getUnaryOperatorReturnType( processFunction, OneInputStreamProcessFunction.class, @@ -101,6 +111,12 @@ public final class StreamUtils { true); } + if (processFunction instanceof EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) { + processFunction = + ((EventTimeWrappedTwoInputNonBroadcastStreamProcessFunction) processFunction) + .getWrappedUserFunction(); + } + return TypeExtractor.getBinaryOperatorReturnType( processFunction, TwoInputNonBroadcastStreamProcessFunction.class, @@ -123,6 +139,12 @@ public final class StreamUtils { TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> processFunction, TypeInformation<IN1> in1TypeInformation, TypeInformation<IN2> in2TypeInformation) { + if (processFunction instanceof EventTimeWrappedTwoInputBroadcastStreamProcessFunction) { + processFunction = + ((EventTimeWrappedTwoInputBroadcastStreamProcessFunction) processFunction) + .getWrappedUserFunction(); + } + return TypeExtractor.getBinaryOperatorReturnType( processFunction, TwoInputBroadcastStreamProcessFunction.class, @@ -146,6 +168,15 @@ public final class StreamUtils { TwoOutputStreamProcessFunction<IN, OUT1, OUT2> twoOutputStreamProcessFunction, TypeInformation<IN> inTypeInformation) { + + if (twoOutputStreamProcessFunction + instanceof EventTimeWrappedTwoOutputStreamProcessFunction) { + twoOutputStreamProcessFunction = + ((EventTimeWrappedTwoOutputStreamProcessFunction) + twoOutputStreamProcessFunction) + .getWrappedUserFunction(); + } + TypeInformation<OUT1> firstOutputType = TypeExtractor.getUnaryOperatorReturnType( twoOutputStreamProcessFunction, diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java new file mode 100644 index 00000000000..a2bb6a78c0f --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/extension/eventtime/EventTimeExtensionITCase.java @@ -0,0 +1,848 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream.extension.eventtime; + +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; +import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.Serializable; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** This ITCase class tests the behavior of {@link EventTimeExtension}. */ +class EventTimeExtensionITCase implements Serializable { + private ExecutionEnvironment env; + private List<Tuple2<Long, String>> inputRecords = + List.of(Tuple2.of(1L, "a"), Tuple2.of(2L, "b"), Tuple2.of(3L, "c")); + private List<Long> inputEventTimes = + inputRecords.stream().map(x -> x.f0).collect(Collectors.toList()); + + @BeforeEach + void before() throws Exception { + env = ExecutionEnvironment.getInstance(); + } + + @AfterEach + void after() { + // one input + TestOneInputStreamProcessFunction.receivedRecords.clear(); + TestOneInputStreamProcessFunction.receivedEventTimes.clear(); + TestOneInputEventTimeStreamProcessFunction.receivedRecords.clear(); + TestOneInputEventTimeStreamProcessFunction.receivedEventTimes.clear(); + TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes.clear(); + + // two output + TestTwoOutputStreamProcessFunction.receivedRecords.clear(); + TestTwoOutputStreamProcessFunction.receivedEventTimes.clear(); + TestTwoOutputEventTimeStreamProcessFunction.receivedRecords.clear(); + TestTwoOutputEventTimeStreamProcessFunction.receivedEventTimes.clear(); + TestTwoOutputEventTimeStreamProcessFunction.invokedTimerTimes.clear(); + + // two input broadcast + TestTwoInputBroadcastStreamProcessFunction.receivedRecords.clear(); + TestTwoInputBroadcastStreamProcessFunction.receivedEventTimes.clear(); + TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedRecords.clear(); + TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedEventTimes.clear(); + + // two input non-broadcast + TestTwoInputNonBroadcastStreamProcessFunction.receivedRecords.clear(); + TestTwoInputNonBroadcastStreamProcessFunction.receivedEventTimes.clear(); + TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedRecords.clear(); + TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedEventTimes.clear(); + TestTwoInputNonBroadcastEventTimeStreamProcessFunction.invokedTimerTimes.clear(); + } + + @Test + void testWatermarkGeneratorGenerateEventTimeWatermark() throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source = getSourceWithWatermarkGenerator(); + source.process(new TestOneInputStreamProcessFunction(true)); + env.execute("testWatermarkGeneratorGenerateEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + // ============== test one input ================= + + @Test + void testOneInputProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source = getSourceWithWatermarkGenerator(); + source.process(new TestOneInputStreamProcessFunction(false)) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testOneInputProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testOneInputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer() + throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source = getSourceWithWatermarkGenerator(); + source.keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(true))); + env.execute( + "testOneInputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer"); + + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes) + .containsExactlyElementsOf( + inputEventTimes.stream().map(x -> x + 1).collect(Collectors.toList())); + } + + @Test + void testOneInputEventTimeProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source = getSourceWithWatermarkGenerator(); + source.process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(false))) + .keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(true))); + env.execute("testOneInputEventTimeProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestOneInputEventTimeStreamProcessFunction.invokedTimerTimes) + .containsExactlyElementsOf( + inputEventTimes.stream().map(x -> x + 1).collect(Collectors.toList())); + } + + // ============== test two output ================= + + @Test + void testTwoOutputProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream< + Tuple2<Long, String>, Tuple2<Long, String>> + twoOutputStream = source.process(new TestTwoOutputStreamProcessFunction(false)); + twoOutputStream.getFirst().process(new TestOneInputStreamProcessFunction(true)); + twoOutputStream + .getFirst() + .keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(true))); + env.execute("testTwoOutputProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testTwoOutputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer() + throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source = getSourceWithWatermarkGenerator(); + source.keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestTwoOutputEventTimeStreamProcessFunction(true))); + env.execute( + "testTwoOutputEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer"); + + assertThat(TestTwoOutputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestTwoOutputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestTwoOutputEventTimeStreamProcessFunction.invokedTimerTimes) + .containsExactlyElementsOf( + inputEventTimes.stream().map(x -> x + 1).collect(Collectors.toList())); + } + + @Test + void testTwoOutputEventTimeProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream< + Tuple2<Long, String>, Tuple2<Long, String>> + twoOutputStream = + source.process( + EventTimeExtension.wrapProcessFunction( + new TestTwoOutputEventTimeStreamProcessFunction(false))); + twoOutputStream.getFirst().process(new TestOneInputStreamProcessFunction(true)); + twoOutputStream + .getFirst() + .keyBy(x -> x.f0) + .process( + EventTimeExtension.wrapProcessFunction( + new TestOneInputEventTimeStreamProcessFunction(true))); + env.execute("testTwoOutputEventTimeProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + // ============== test two input broadcast ================= + + @Test + void testTwoInputBroadcastProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream<Tuple2<Long, String>> source2 = getSourceWithWatermarkGenerator(); + source1.broadcast() + .connectAndProcess(source2, new TestTwoInputBroadcastStreamProcessFunction(false)) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testTwoInputBroadcastProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testTwoInputBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer() + throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream<Tuple2<Long, String>> source2 = getSourceWithWatermarkGenerator(); + source1.broadcast() + .connectAndProcess( + source2, + EventTimeExtension.wrapProcessFunction( + new TestTwoInputBroadcastEventTimeStreamProcessFunction(true))); + env.execute( + "testTwoInputBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer"); + + assertThat(TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestTwoInputBroadcastEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testTwoInputBroadcastEventTimeProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream<Tuple2<Long, String>> source2 = getSourceWithWatermarkGenerator(); + source1.broadcast() + .connectAndProcess( + source2, + EventTimeExtension.wrapProcessFunction( + new TestTwoInputBroadcastEventTimeStreamProcessFunction(false))) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testTwoInputBroadcastEventTimeProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + // ============== test two input non-broadcast ================= + + @Test + void testTwoInputNonBroadcastProcessFunctionForwardEventTimeWatermark() throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream<Tuple2<Long, String>> source2 = getSourceWithWatermarkGenerator(); + source1.connectAndProcess(source2, new TestTwoInputNonBroadcastStreamProcessFunction(false)) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testTwoInputNonBroadcastProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + @Test + void testTwoInputNonBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer() + throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream<Tuple2<Long, String>> source2 = getSourceWithWatermarkGenerator(); + source1.keyBy(x -> x.f0) + .connectAndProcess( + source2.keyBy(x -> x.f0), + EventTimeExtension.wrapProcessFunction( + new TestTwoInputNonBroadcastEventTimeStreamProcessFunction(true))); + env.execute( + "testTwoInputNonBroadcastEventTimeProcessFunctionReceiveEventTimeWatermarkAndRegisterTimer"); + + assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + assertThat(TestTwoInputNonBroadcastEventTimeStreamProcessFunction.invokedTimerTimes) + .containsExactlyElementsOf( + inputEventTimes.stream().map(x -> x + 1).collect(Collectors.toList())); + } + + @Test + void testTwoInputNonBroadcastEventTimeProcessFunctionForwardEventTimeWatermark() + throws Exception { + NonKeyedPartitionStream<Tuple2<Long, String>> source1 = getSourceWithWatermarkGenerator(); + NonKeyedPartitionStream<Tuple2<Long, String>> source2 = getSourceWithWatermarkGenerator(); + source1.keyBy(x -> x.f0) + .connectAndProcess( + source2.keyBy(x -> x.f0), + EventTimeExtension.wrapProcessFunction( + new TestTwoInputNonBroadcastEventTimeStreamProcessFunction(false))) + .process(new TestOneInputStreamProcessFunction(true)); + env.execute("testTwoInputNonBroadcastEventTimeProcessFunctionForwardEventTimeWatermark"); + + assertThat(TestOneInputStreamProcessFunction.receivedRecords) + .containsExactlyElementsOf(inputRecords); + assertThat(TestOneInputStreamProcessFunction.receivedEventTimes) + .containsExactlyElementsOf(inputEventTimes); + } + + private static class TestOneInputStreamProcessFunction + implements OneInputStreamProcessFunction<Tuple2<Long, String>, Tuple2<Long, String>> { + public static ConcurrentLinkedQueue<Tuple2<Long, String>> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + + public TestOneInputStreamProcessFunction(boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + + @Override + public void processRecord( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output.collect(record); + } + } + + private static class TestOneInputEventTimeStreamProcessFunction + implements OneInputEventTimeStreamProcessFunction< + Tuple2<Long, String>, Tuple2<Long, String>> { + public static ConcurrentLinkedQueue<Tuple2<Long, String>> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> receivedEventTimes = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> invokedTimerTimes = new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + private EventTimeManager eventTimeManager; + + public TestOneInputEventTimeStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void processRecord( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + eventTimeManager.registerTimer(record.f0 + 1); + } + output.collect(record); + } + + @Override + public void onEventTimeWatermark( + long watermarkTimestamp, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(watermarkTimestamp); + } + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public void onEventTimer( + long timestamp, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) { + if (needCollectReceivedWatermarkAndRecord) { + invokedTimerTimes.add(timestamp); + } + } + } + + private static class TestTwoOutputStreamProcessFunction + implements TwoOutputStreamProcessFunction< + Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, String>> { + public static ConcurrentLinkedQueue<Tuple2<Long, String>> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + + public TestTwoOutputStreamProcessFunction(boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector<Tuple2<Long, String>> output1, + Collector<Tuple2<Long, String>> output2, + TwoOutputNonPartitionedContext<Tuple2<Long, String>, Tuple2<Long, String>> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + + @Override + public void processRecord( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output1, + Collector<Tuple2<Long, String>> output2, + TwoOutputPartitionedContext<Tuple2<Long, String>, Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output1.collect(record); + output2.collect(record); + } + } + + private static class TestTwoOutputEventTimeStreamProcessFunction + implements TwoOutputEventTimeStreamProcessFunction< + Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, String>> { + public static ConcurrentLinkedQueue<Tuple2<Long, String>> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> receivedEventTimes = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> invokedTimerTimes = new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + private EventTimeManager eventTimeManager; + + public TestTwoOutputEventTimeStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void processRecord( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output1, + Collector<Tuple2<Long, String>> output2, + TwoOutputPartitionedContext<Tuple2<Long, String>, Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + eventTimeManager.registerTimer(record.f0 + 1); + } + output1.collect(record); + output2.collect(record); + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector<Tuple2<Long, String>> output1, + Collector<Tuple2<Long, String>> output2, + TwoOutputNonPartitionedContext<Tuple2<Long, String>, Tuple2<Long, String>> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public void onEventTimeWatermark( + long watermarkTimestamp, + Collector<Tuple2<Long, String>> output1, + Collector<Tuple2<Long, String>> output2, + TwoOutputNonPartitionedContext<Tuple2<Long, String>, Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(watermarkTimestamp); + } + } + + @Override + public void onEventTimer( + long timestamp, + Collector<Tuple2<Long, String>> output1, + Collector<Tuple2<Long, String>> output2, + TwoOutputPartitionedContext<Tuple2<Long, String>, Tuple2<Long, String>> ctx) { + if (needCollectReceivedWatermarkAndRecord) { + invokedTimerTimes.add(timestamp); + } + } + } + + private static class TestTwoInputBroadcastStreamProcessFunction + implements TwoInputBroadcastStreamProcessFunction< + Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, String>> { + public static ConcurrentLinkedQueue<Tuple2<Long, String>> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + + public TestTwoInputBroadcastStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void processRecordFromNonBroadcastInput( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output.collect(record); + } + + @Override + public void processRecordFromBroadcastInput( + Tuple2<Long, String> record, NonPartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + // do nothing + } + + @Override + public WatermarkHandlingResult onWatermarkFromBroadcastInput( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + + @Override + public WatermarkHandlingResult onWatermarkFromNonBroadcastInput( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + } + + private static class TestTwoInputBroadcastEventTimeStreamProcessFunction + implements TwoInputBroadcastEventTimeStreamProcessFunction< + Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, String>> { + public static ConcurrentLinkedQueue<Tuple2<Long, String>> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + private EventTimeManager eventTimeManager; + + public TestTwoInputBroadcastEventTimeStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void processRecordFromNonBroadcastInput( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output.collect(record); + } + + @Override + public void processRecordFromBroadcastInput( + Tuple2<Long, String> record, NonPartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + // do nothing + } + + @Override + public WatermarkHandlingResult onWatermarkFromBroadcastInput( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public WatermarkHandlingResult onWatermarkFromNonBroadcastInput( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public void onEventTimeWatermark( + long watermarkTimestamp, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(watermarkTimestamp); + } + } + + @Override + public void onEventTimer( + long timestamp, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) { + throw new UnsupportedOperationException("This function shouldn't be invoked."); + } + } + + private static class TestTwoInputNonBroadcastStreamProcessFunction + implements TwoInputNonBroadcastStreamProcessFunction< + Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, String>> { + public static ConcurrentLinkedQueue<Tuple2<Long, String>> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> receivedEventTimes = + new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + + public TestTwoInputNonBroadcastStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void processRecordFromFirstInput( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + } + output.collect(record); + } + + @Override + public void processRecordFromSecondInput( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) + throws Exception {} + + @Override + public WatermarkHandlingResult onWatermarkFromFirstInput( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + if (EventTimeExtension.isEventTimeWatermark(watermark) + && needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(((LongWatermark) watermark).getValue()); + } + return WatermarkHandlingResult.PEEK; + } + + @Override + public WatermarkHandlingResult onWatermarkFromSecondInput( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + return WatermarkHandlingResult.PEEK; + } + } + + private static class TestTwoInputNonBroadcastEventTimeStreamProcessFunction + implements TwoInputNonBroadcastEventTimeStreamProcessFunction< + Tuple2<Long, String>, Tuple2<Long, String>, Tuple2<Long, String>> { + public static ConcurrentLinkedQueue<Tuple2<Long, String>> receivedRecords = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> receivedEventTimes = + new ConcurrentLinkedQueue<>(); + public static ConcurrentLinkedQueue<Long> invokedTimerTimes = new ConcurrentLinkedQueue<>(); + private boolean needCollectReceivedWatermarkAndRecord; + private EventTimeManager eventTimeManager; + + public TestTwoInputNonBroadcastEventTimeStreamProcessFunction( + boolean needCollectReceivedWatermarkAndRecord) { + this.needCollectReceivedWatermarkAndRecord = needCollectReceivedWatermarkAndRecord; + } + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void onEventTimeWatermark( + long watermarkTimestamp, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedEventTimes.add(watermarkTimestamp); + } + } + + @Override + public void onEventTimer( + long timestamp, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) { + if (needCollectReceivedWatermarkAndRecord) { + invokedTimerTimes.add(timestamp); + } + } + + @Override + public void processRecordFromFirstInput( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + if (needCollectReceivedWatermarkAndRecord) { + receivedRecords.add(record); + eventTimeManager.registerTimer(record.f0 + 1); + } + output.collect(record); + } + + @Override + public void processRecordFromSecondInput( + Tuple2<Long, String> record, + Collector<Tuple2<Long, String>> output, + PartitionedContext<Tuple2<Long, String>> ctx) + throws Exception { + // do nothing + } + + @Override + public WatermarkHandlingResult onWatermarkFromFirstInput( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + + @Override + public WatermarkHandlingResult onWatermarkFromSecondInput( + Watermark watermark, + Collector<Tuple2<Long, String>> output, + NonPartitionedContext<Tuple2<Long, String>> ctx) { + assertThat(EventTimeExtension.isEventTimeWatermark(watermark)).isFalse(); + return WatermarkHandlingResult.PEEK; + } + } + + private NonKeyedPartitionStream<Tuple2<Long, String>> getSourceWithWatermarkGenerator() { + NonKeyedPartitionStream<Tuple2<Long, String>> source = + env.fromSource(DataStreamV2SourceUtils.fromData(inputRecords), "Source") + .withParallelism(1); + + return source.process( + EventTimeExtension.<Tuple2<Long, String>>newWatermarkGeneratorBuilder( + event -> event.f0) + .perEventWatermark() + .buildAsProcessFunction()); + } +}