This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b7038c414ccd82567c90578c3b28bef04ab2271
Author: Xu Huang <huangxu.wal...@gmail.com>
AuthorDate: Mon Jan 13 15:19:15 2025 +0800

    [FLINK-37112][API] Introduce EventTimeWatermark definition and 
EventTimeWatermarkGeneratorBuilder for DataStream V2
---
 .../common/eventtime/WatermarksWithIdleness.java   |   6 +-
 .../extension/eventtime/EventTimeExtension.java    | 139 +++++++++++++
 .../eventtime/strategy/EventTimeExtractor.java     |  31 +++
 .../EventTimeWatermarkGeneratorBuilder.java        | 125 +++++++++++
 .../strategy/EventTimeWatermarkStrategy.java       |  98 +++++++++
 .../eventtime/EventTimeExtensionImpl.java          |  45 ++++
 .../functions/ExtractEventTimeProcessFunction.java | 194 +++++++++++++++++
 .../datastream/impl/operators/ProcessOperator.java |  12 ++
 .../ExtractEventTimeProcessFunctionTest.java       | 230 +++++++++++++++++++++
 9 files changed, 878 insertions(+), 2 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
index 74a4efcc2d0..4cc651cf6e9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.eventtime;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.clock.Clock;
@@ -84,7 +85,8 @@ public class WatermarksWithIdleness<T> implements 
WatermarkGenerator<T> {
     // ------------------------------------------------------------------------
 
     @VisibleForTesting
-    static final class IdlenessTimer {
+    @Internal
+    public static final class IdlenessTimer {
 
         /** The clock used to measure elapsed time. */
         private final RelativeClock clock;
@@ -104,7 +106,7 @@ public class WatermarksWithIdleness<T> implements 
WatermarkGenerator<T> {
         /** The duration before the output is marked as idle. */
         private final long maxIdleTimeNanos;
 
-        IdlenessTimer(RelativeClock clock, Duration idleTimeout) {
+        public IdlenessTimer(RelativeClock clock, Duration idleTimeout) {
             this.clock = clock;
 
             long idleNanos;
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java
new file mode 100644
index 00000000000..9417baef1d5
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor;
+import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder;
+import org.apache.flink.datastream.api.function.ProcessFunction;
+
+/**
+ * The entry point for the event-time extension, which provides the following 
functionality:
+ *
+ * <ul>
+ *   <li>defines the event-time watermark and idle status watermark. If you 
use the {@link
+ *       EventTimeWatermarkGeneratorBuilder} below, then you don't need to 
declare these watermarks
+ *       manually in your application; otherwise you need to declare them in 
your own {@link
+ *       ProcessFunction}.
+ *   <li>provides the {@link EventTimeWatermarkGeneratorBuilder} to facilitate 
the generation of
+ *       event time watermarks. An example of using {@link 
EventTimeWatermarkGeneratorBuilder} is as
+ *       follows:
+ *       <pre>{@code
+ * OneInputStreamProcessFunction<POJO, POJO> watermarkGeneratorProcessFunction
+ *       = EventTimeExtension
+ *       .newWatermarkGeneratorBuilder(POJO::getEventTime)
+ *       .periodicWatermark()
+ *       .buildAsProcessFunction();
+ * source.process(watermarkGeneratorProcessFunction)
+ *       .process(...)
+ * }</pre>
+ * </ul>
+ */
+@Experimental
+public class EventTimeExtension {
+
+    // =============== Event Time related Watermark Declarations 
===============
+
+    /**
+     * Definition of EventTimeWatermark. The EventTimeWatermark represents a 
specific timestamp,
+     * signifying the passage of time. Once a process function receives an 
EventTimeWatermark, it
+     * will no longer receive events with a timestamp earlier than that 
watermark.
+     */
+    public static final LongWatermarkDeclaration 
EVENT_TIME_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("BUILTIN_API_EVENT_TIME")
+                    .typeLong()
+                    .combineFunctionMin()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    /**
+     * Definition of IdleStatusWatermark. The IdleStatusWatermark indicates 
that a particular input
+     * is in an idle state. When a ProcessFunction receives an 
IdleStatusWatermark from an input, it
+     * should ignore that input when combining EventTimeWatermarks.
+     */
+    public static final BoolWatermarkDeclaration 
IDLE_STATUS_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("BUILTIN_API_EVENT_TIME_IDLE")
+                    .typeBool()
+                    .combineFunctionAND()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    /**
+     * Determine if the received watermark is an EventTimeWatermark.
+     *
+     * @param watermark The watermark to be checked.
+     * @return true if the watermark is an EventTimeWatermark; false otherwise.
+     */
+    public static boolean isEventTimeWatermark(Watermark watermark) {
+        return isEventTimeWatermark(watermark.getIdentifier());
+    }
+
+    /**
+     * Determine if the received watermark is an EventTimeWatermark by 
watermark identifier.
+     *
+     * @param watermarkIdentifier The identifier of the watermark to be 
checked.
+     * @return true if the watermark is an EventTimeWatermark; false otherwise.
+     */
+    public static boolean isEventTimeWatermark(String watermarkIdentifier) {
+        return 
watermarkIdentifier.equals(EVENT_TIME_WATERMARK_DECLARATION.getIdentifier());
+    }
+
+    /**
+     * Determine if the received watermark is an IdleStatusWatermark.
+     *
+     * @param watermark The watermark to be checked.
+     * @return true if the watermark is an IdleStatusWatermark; false 
otherwise.
+     */
+    public static boolean isIdleStatusWatermark(Watermark watermark) {
+        return isIdleStatusWatermark(watermark.getIdentifier());
+    }
+
+    /**
+     * Determine if the received watermark is an IdleStatusWatermark by 
watermark identifier.
+     *
+     * @param watermarkIdentifier The identifier of the watermark to be 
checked.
+     * @return true if the watermark is an IdleStatusWatermark; false 
otherwise.
+     */
+    public static boolean isIdleStatusWatermark(String watermarkIdentifier) {
+        return 
watermarkIdentifier.equals(IDLE_STATUS_WATERMARK_DECLARATION.getIdentifier());
+    }
+
+    // ======== EventTimeWatermarkGeneratorBuilder to generate event time 
watermarks =========
+
+    /**
+     * Create an instance of {@link EventTimeWatermarkGeneratorBuilder}, which 
contains a {@code
+     * EventTimeExtractor}.
+     *
+     * @param eventTimeExtractor An instance of {@code EventTimeExtractor} 
used to extract event
+     *     time information from data records.
+     * @param <T> The type of data records.
+     * @return An instance of {@code EventTimeWatermarkGeneratorBuilder} 
containing the specified
+     *     event time extractor.
+     */
+    public static <T> EventTimeWatermarkGeneratorBuilder<T> 
newWatermarkGeneratorBuilder(
+            EventTimeExtractor<T> eventTimeExtractor) {
+        return new EventTimeWatermarkGeneratorBuilder<>(eventTimeExtractor);
+    }
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeExtractor.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeExtractor.java
new file mode 100644
index 00000000000..cafa443cc06
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeExtractor.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+
+/** A user function designed to extract event time from an event. */
+@Experimental
+public interface EventTimeExtractor<T> extends Serializable {
+
+    /** Extract the event time from the event, with the result provided in 
milliseconds. */
+    long extractTimestamp(T event);
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkGeneratorBuilder.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkGeneratorBuilder.java
new file mode 100644
index 00000000000..a330eff23dd
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkGeneratorBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.strategy;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+
+import java.time.Duration;
+
+/**
+ * A utility class for constructing a processing function that extracts event 
time and generates
+ * event time watermarks in the {@link EventTimeExtension}.
+ */
+@Experimental
+public class EventTimeWatermarkGeneratorBuilder<T> {
+    // how to extract event time from event
+    private EventTimeExtractor<T> eventTimeExtractor;
+
+    // what frequency to generate event time watermark
+    private EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode 
generateMode =
+            EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC;
+
+    // if not set, it will default to the value of the 
"pipeline.auto-watermark-interval"
+    // configuration.
+    private Duration periodicWatermarkInterval = Duration.ZERO;
+
+    // if set to zero, it will not generate idle status watermark
+    private Duration idleTimeout = Duration.ZERO;
+
+    // max out-of-order time
+    private Duration maxOutOfOrderTime = Duration.ZERO;
+
+    // =========  how to extract event times from events =========
+
+    public EventTimeWatermarkGeneratorBuilder(EventTimeExtractor<T> 
eventTimeExtractor) {
+        this.eventTimeExtractor = eventTimeExtractor;
+    }
+
+    // =========  generate the event time watermark with what value =========
+
+    public EventTimeWatermarkGeneratorBuilder<T> withIdleness(Duration 
idleTimeout) {
+        this.idleTimeout = idleTimeout;
+        return this;
+    }
+
+    public EventTimeWatermarkGeneratorBuilder<T> 
withMaxOutOfOrderTime(Duration maxOutOfOrderTime) {
+        this.maxOutOfOrderTime = maxOutOfOrderTime;
+        return this;
+    }
+
+    // =========  when to generate event time watermark =========
+
+    public EventTimeWatermarkGeneratorBuilder<T> noWatermark() {
+        this.generateMode = 
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.NO_WATERMARK;
+        return this;
+    }
+
+    /**
+     * The periodic watermark interval will be set to the value specified by
+     * PipelineOptions#AUTO_WATERMARK_INTERVAL.
+     */
+    public EventTimeWatermarkGeneratorBuilder<T> periodicWatermark() {
+        this.generateMode = 
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC;
+        return this;
+    }
+
+    public EventTimeWatermarkGeneratorBuilder<T> periodicWatermark(
+            Duration periodicWatermarkInterval) {
+        this.generateMode = 
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC;
+        this.periodicWatermarkInterval = periodicWatermarkInterval;
+        return this;
+    }
+
+    public EventTimeWatermarkGeneratorBuilder<T> perEventWatermark() {
+        this.generateMode = 
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT;
+        return this;
+    }
+
+    // =========  build the watermark generator as process function =========
+
+    public OneInputStreamProcessFunction<T, T> buildAsProcessFunction() {
+        EventTimeWatermarkStrategy<T> watermarkStrategy =
+                new EventTimeWatermarkStrategy<>(
+                        this.eventTimeExtractor,
+                        this.generateMode,
+                        this.periodicWatermarkInterval,
+                        this.idleTimeout,
+                        this.maxOutOfOrderTime);
+
+        try {
+            return (OneInputStreamProcessFunction<T, T>)
+                    getEventTimeExtensionImplClass()
+                            .getMethod("buildAsProcessFunction", 
EventTimeWatermarkStrategy.class)
+                            .invoke(null, watermarkStrategy);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static Class<?> getEventTimeExtensionImplClass() {
+        try {
+            return Class.forName(
+                    
"org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Please ensure that flink-datastream in 
your class path");
+        }
+    }
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkStrategy.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkStrategy.java
new file mode 100644
index 00000000000..9f6a7a6f585
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkStrategy.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.strategy;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** Component which encapsulates the logic of how and when to extract event 
time and watermarks. */
+@Experimental
+public class EventTimeWatermarkStrategy<T> implements Serializable {
+    // how to extract event time from event
+    private final EventTimeExtractor<T> eventTimeExtractor;
+
+    // what frequency to generate event time watermark
+    private final EventTimeWatermarkGenerateMode generateMode;
+
+    // if not set, it will default to the value of the 
"pipeline.auto-watermark-interval"
+    // configuration.
+    private final Duration periodicWatermarkInterval;
+
+    // if set to zero, it will not generate idle status watermark
+    private final Duration idleTimeout;
+
+    // max out-of-order time
+    private final Duration maxOutOfOrderTime;
+
+    public EventTimeWatermarkStrategy(EventTimeExtractor<T> 
eventTimeExtractor) {
+        this(
+                eventTimeExtractor,
+                EventTimeWatermarkGenerateMode.PERIODIC,
+                Duration.ZERO,
+                Duration.ZERO,
+                Duration.ZERO);
+    }
+
+    public EventTimeWatermarkStrategy(
+            EventTimeExtractor<T> eventTimeExtractor,
+            EventTimeWatermarkGenerateMode generateMode,
+            Duration periodicWatermarkInterval,
+            Duration idleTimeout,
+            Duration maxOutOfOrderTime) {
+        this.eventTimeExtractor = eventTimeExtractor;
+        this.generateMode = generateMode;
+        this.periodicWatermarkInterval = periodicWatermarkInterval;
+        this.idleTimeout = idleTimeout;
+        this.maxOutOfOrderTime = maxOutOfOrderTime;
+    }
+
+    public EventTimeExtractor<T> getEventTimeExtractor() {
+        return eventTimeExtractor;
+    }
+
+    public EventTimeWatermarkGenerateMode getGenerateMode() {
+        return generateMode;
+    }
+
+    public Duration getPeriodicWatermarkInterval() {
+        return periodicWatermarkInterval;
+    }
+
+    public Duration getIdleTimeout() {
+        return idleTimeout;
+    }
+
+    public Duration getMaxOutOfOrderTime() {
+        return maxOutOfOrderTime;
+    }
+
+    /**
+     * {@link EventTimeWatermarkGenerateMode} indicates the frequency at which 
event-time watermarks
+     * are generated.
+     */
+    @Internal
+    public enum EventTimeWatermarkGenerateMode {
+        NO_WATERMARK,
+        PERIODIC,
+        PER_EVENT,
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java
new file mode 100644
index 00000000000..a4a26bc9490
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/EventTimeExtensionImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.eventtime;
+
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction;
+
+/** The implementation of {@link EventTimeExtension}. */
+public class EventTimeExtensionImpl {
+
+    // ============= Extract Event Time Process Function =============
+
+    /**
+     * Build an {@link ExtractEventTimeProcessFunction} to extract event time 
according to {@link
+     * EventTimeWatermarkStrategy}.
+     */
+    public static <T> OneInputStreamProcessFunction<T, T> 
buildAsProcessFunction(
+            EventTimeWatermarkStrategy<T> strategy) {
+        return new ExtractEventTimeProcessFunction<>(strategy);
+    }
+
+    // ============= Other Utils =============
+    public static boolean isEventTimeExtensionWatermark(Watermark watermark) {
+        return EventTimeExtension.isEventTimeWatermark(watermark)
+                || EventTimeExtension.isIdleStatusWatermark(watermark);
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunction.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunction.java
new file mode 100644
index 00000000000..db2c3bec7e3
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunction.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.eventtime.functions;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.eventtime.WatermarksWithIdleness.IdlenessTimer;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.common.watermark.WatermarkManager;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A specialized process function designed for extracting event timestamps. */
+public class ExtractEventTimeProcessFunction<IN>
+        implements OneInputStreamProcessFunction<IN, IN>,
+                ProcessingTimeService.ProcessingTimeCallback {
+
+    /** User-defined watermark strategy. */
+    private final EventTimeWatermarkStrategy<IN> watermarkStrategy;
+
+    /** The maximum timestamp encountered so far. */
+    private long currentMaxEventTime = Long.MIN_VALUE;
+
+    private long lastEmittedEventTime = Long.MIN_VALUE;
+
+    /**
+     * The periodic processing timer interval; if not configured by user in 
{@link
+     * EventTimeWatermarkStrategy}, it will default to the value specified by 
{@link
+     * PipelineOptions#AUTO_WATERMARK_INTERVAL}.
+     */
+    private long periodicTimerInterval = 0;
+
+    /**
+     * Whether enable create and send {@link 
EventTimeExtension#IDLE_STATUS_WATERMARK_DECLARATION}.
+     */
+    private boolean enableIdleStatus;
+
+    /** The {@link IdlenessTimer} is utilized to check whether the input is 
currently idle. */
+    private IdlenessTimer idlenessTimer;
+
+    private boolean isIdleNow = false;
+
+    private final long maxOutOfOrderTimeInMs;
+
+    private ProcessingTimeService processingTimeService;
+
+    private WatermarkManager watermarkManager;
+
+    public ExtractEventTimeProcessFunction(EventTimeWatermarkStrategy<IN> 
watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
+        if (watermarkStrategy.getIdleTimeout().toMillis() > 0) {
+            this.enableIdleStatus = true;
+        }
+        this.maxOutOfOrderTimeInMs = 
watermarkStrategy.getMaxOutOfOrderTime().toMillis();
+    }
+
+    public void initEventTimeExtension(
+            ExecutionConfig config,
+            WatermarkManager watermarkManager,
+            ProcessingTimeService processingTimeService) {
+        this.processingTimeService = processingTimeService;
+        this.watermarkManager = watermarkManager;
+
+        if (enableIdleStatus) {
+            this.idlenessTimer =
+                    new IdlenessTimer(
+                            processingTimeService.getClock(), 
watermarkStrategy.getIdleTimeout());
+        }
+
+        // May need register timer to check whether the input is idle and 
periodically send event
+        // time watermarks
+        boolean needRegisterTimer =
+                watermarkStrategy.getGenerateMode()
+                                == 
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode
+                                        .PERIODIC
+                        || enableIdleStatus;
+        // set timer interval default to config option {@link
+        // PipelineOptions#AUTO_WATERMARK_INTERVAL}
+        this.periodicTimerInterval = config.getAutoWatermarkInterval();
+        if (watermarkStrategy.getGenerateMode()
+                        == 
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC
+                && !watermarkStrategy.getPeriodicWatermarkInterval().isZero()) 
{
+            this.periodicTimerInterval =
+                    
watermarkStrategy.getPeriodicWatermarkInterval().toMillis();
+        }
+        checkState(
+                periodicTimerInterval > 0,
+                "Watermark interval " + periodicTimerInterval + " should large 
to 0.");
+
+        if (needRegisterTimer) {
+            processingTimeService.registerTimer(
+                    processingTimeService.getCurrentProcessingTime() + 
periodicTimerInterval, this);
+        }
+    }
+
+    @Override
+    public Set<? extends WatermarkDeclaration> declareWatermarks() {
+        // declare EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION
+        // if idle status is enabled, also declare
+        // EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.
+        Set<WatermarkDeclaration> watermarkDeclarations = new HashSet<>();
+        
watermarkDeclarations.add(EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION);
+        if (enableIdleStatus) {
+            
watermarkDeclarations.add(EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION);
+        }
+        return watermarkDeclarations;
+    }
+
+    @Override
+    public void processRecord(IN record, Collector<IN> output, 
PartitionedContext<IN> ctx)
+            throws Exception {
+        if (enableIdleStatus) {
+            if (isIdleNow) {
+                watermarkManager.emitWatermark(
+                        
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(false));
+                isIdleNow = false;
+            }
+
+            // mark current input as active
+            idlenessTimer.activity();
+        }
+
+        // extract event time from record
+        long extractedEventTime =
+                
watermarkStrategy.getEventTimeExtractor().extractTimestamp(record);
+        currentMaxEventTime = Math.max(currentMaxEventTime, 
extractedEventTime);
+        output.collectAndOverwriteTimestamp(record, extractedEventTime);
+
+        if (watermarkStrategy.getGenerateMode()
+                == 
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT) {
+            // If the per event watermark is utilized, create event time 
watermark and send
+            
tryEmitEventTimeWatermark(ctx.getNonPartitionedContext().getWatermarkManager());
+        }
+    }
+
+    /**
+     * The processing timer has two goals: 1. check whether the input is idle 
2. periodically emit
+     * event time watermark
+     */
+    @Override
+    public void onProcessingTime(long time) throws IOException, 
InterruptedException, Exception {
+        if (enableIdleStatus && idlenessTimer.checkIfIdle()) {
+            if (!isIdleNow) {
+                watermarkManager.emitWatermark(
+                        
EventTimeExtension.IDLE_STATUS_WATERMARK_DECLARATION.newWatermark(true));
+                isIdleNow = true;
+            }
+        } else if (watermarkStrategy.getGenerateMode()
+                == 
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC) {
+            tryEmitEventTimeWatermark(watermarkManager);
+        }
+
+        processingTimeService.registerTimer(time + periodicTimerInterval, 
this);
+    }
+
+    private void tryEmitEventTimeWatermark(WatermarkManager watermarkManager) {
+        if (currentMaxEventTime == Long.MIN_VALUE) {
+            return;
+        }
+
+        long needEmittedEventTime = currentMaxEventTime - 
maxOutOfOrderTimeInMs;
+        if (needEmittedEventTime > lastEmittedEventTime) {
+            watermarkManager.emitWatermark(
+                    
EventTimeExtension.EVENT_TIME_WATERMARK_DECLARATION.newWatermark(
+                            needEmittedEventTime));
+            lastEmittedEventTime = needEmittedEventTime;
+        }
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index 684cbf5530d..cde50f26a74 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultPartitionedContext;
 import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
 import 
org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
+import 
org.apache.flink.datastream.impl.extension.eventtime.functions.ExtractEventTimeProcessFunction;
 import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
 import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -98,6 +99,16 @@ public class ProcessOperator<IN, OUT>
         outputCollector = getOutputCollector();
         nonPartitionedContext = getNonPartitionedContext();
         partitionedContext.setNonPartitionedContext(nonPartitionedContext);
+
+        // Initialize event time extension related ProcessFunction
+        if (userFunction instanceof ExtractEventTimeProcessFunction) {
+            ((ExtractEventTimeProcessFunction<IN>) userFunction)
+                    .initEventTimeExtension(
+                            getExecutionConfig(),
+                            
partitionedContext.getNonPartitionedContext().getWatermarkManager(),
+                            getProcessingTimeService());
+        }
+
         userFunction.open(nonPartitionedContext);
     }
 
@@ -140,6 +151,7 @@ public class ProcessOperator<IN, OUT>
         } else {
             return (r, k) -> {
                 Object oldKey = currentKey();
+                setCurrentKey(k);
                 try {
                     r.run();
                 } finally {
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunctionTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunctionTest.java
new file mode 100644
index 00000000000..1de5040e519
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/extension/eventtime/functions/ExtractEventTimeProcessFunctionTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.eventtime.functions;
+
+import org.apache.flink.api.common.watermark.BoolWatermark;
+import org.apache.flink.api.common.watermark.LongWatermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclaration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor;
+import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy;
+import org.apache.flink.datastream.impl.operators.ProcessOperator;
+import org.apache.flink.runtime.event.WatermarkEvent;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.watermark.WatermarkUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ExtractEventTimeProcessFunction}. */
+public class ExtractEventTimeProcessFunctionTest {
+
+    @Test
+    void testDoNotGenerateEventTimeWatermark() throws Exception {
+        EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy =
+                new EventTimeWatermarkStrategy<>(
+                        (EventTimeExtractor<Tuple2<Long, String>>) event -> 
event.f0,
+                        
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.NO_WATERMARK,
+                        Duration.ZERO,
+                        Duration.ZERO,
+                        Duration.ZERO);
+        OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, 
String>>
+                operatorTestHarness = 
getOperatorTestHarness(watermarkStartegy);
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345678L, "hello")));
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput());
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345679L, "hello")));
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput());
+
+        operatorTestHarness.close();
+    }
+
+    @Test
+    void testGenerateEventTimeWatermarkPerEvent() throws Exception {
+        EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy =
+                new EventTimeWatermarkStrategy<>(
+                        (EventTimeExtractor<Tuple2<Long, String>>) event -> 
event.f0,
+                        
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT,
+                        Duration.ZERO,
+                        Duration.ZERO,
+                        Duration.ZERO);
+        OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, 
String>>
+                operatorTestHarness = 
getOperatorTestHarness(watermarkStartegy);
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345678L, "hello")));
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 
12345678L);
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345679L, "hello")));
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 
12345678L, 12345679L);
+
+        operatorTestHarness.close();
+    }
+
+    @Test
+    void testGenerateEventTimeWatermarkPeriodic() throws Exception {
+        EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy =
+                new EventTimeWatermarkStrategy<>(
+                        (EventTimeExtractor<Tuple2<Long, String>>) event -> 
event.f0,
+                        
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PERIODIC,
+                        Duration.ofMillis(200),
+                        Duration.ZERO,
+                        Duration.ZERO);
+        OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, 
String>>
+                operatorTestHarness = 
getOperatorTestHarness(watermarkStartegy);
+
+        operatorTestHarness.getProcessingTimeService().advance(200);
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput());
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345678L, "hello")));
+        operatorTestHarness.getProcessingTimeService().advance(200);
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 
12345678L);
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345679L, "hello")));
+        operatorTestHarness.getProcessingTimeService().advance(200);
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 
12345678L, 12345679L);
+
+        operatorTestHarness.getProcessingTimeService().advance(1000);
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 
12345678L, 12345679L);
+
+        operatorTestHarness.close();
+    }
+
+    @Test
+    void testGenerateEventTimeWatermarkWithOutOfOrderTime() throws Exception {
+        EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy =
+                new EventTimeWatermarkStrategy<>(
+                        (EventTimeExtractor<Tuple2<Long, String>>) event -> 
event.f0,
+                        
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT,
+                        Duration.ZERO,
+                        Duration.ZERO,
+                        Duration.ofMillis(100));
+        OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, 
String>>
+                operatorTestHarness = 
getOperatorTestHarness(watermarkStartegy);
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345678L, "hello")));
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 
12345678L - 100);
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345679L, "hello")));
+        checkOutputEventTimeWatermarks(
+                operatorTestHarness.getOutput(), 12345678L - 100, 12345679L - 
100);
+
+        operatorTestHarness.close();
+    }
+
+    @Test
+    void testGenerateIdleStatusWatermark() throws Exception {
+        EventTimeWatermarkStrategy<Tuple2<Long, String>> watermarkStartegy =
+                new EventTimeWatermarkStrategy<>(
+                        (EventTimeExtractor<Tuple2<Long, String>>) event -> 
event.f0,
+                        
EventTimeWatermarkStrategy.EventTimeWatermarkGenerateMode.PER_EVENT,
+                        Duration.ZERO,
+                        Duration.ofMillis(200),
+                        Duration.ZERO);
+        OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, 
String>>
+                operatorTestHarness = 
getOperatorTestHarness(watermarkStartegy);
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345678L, "hello")));
+        checkOutputIdleStatusWatermarks(operatorTestHarness.getOutput());
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 
12345678L);
+
+        // simulate time advancing multiple times and thus triggering the 
timer multiple times,
+        // otherwise {@link ExtractEventTimeProcessFunction.onProcessingTime} 
will get the wrong
+        // time when judging the idle in the timer
+        operatorTestHarness.getProcessingTimeService().advance(200);
+        operatorTestHarness.getProcessingTimeService().advance(200);
+        operatorTestHarness.getProcessingTimeService().advance(200);
+        operatorTestHarness.getProcessingTimeService().advance(200);
+        operatorTestHarness.getProcessingTimeService().advance(200);
+        checkOutputIdleStatusWatermarks(operatorTestHarness.getOutput(), true);
+
+        operatorTestHarness.processElement(new StreamRecord<>(new 
Tuple2<>(12345679L, "hello")));
+        checkOutputIdleStatusWatermarks(operatorTestHarness.getOutput(), true, 
false);
+        checkOutputEventTimeWatermarks(operatorTestHarness.getOutput(), 
12345678L, 12345679L);
+
+        operatorTestHarness.close();
+    }
+
+    private OneInputStreamOperatorTestHarness<Tuple2<Long, String>, 
Tuple2<Long, String>>
+            getOperatorTestHarness(
+                    EventTimeWatermarkStrategy<Tuple2<Long, String>> 
watermarkStrategy)
+                    throws Exception {
+        ExtractEventTimeProcessFunction<Tuple2<Long, String>> processFunction =
+                new ExtractEventTimeProcessFunction<>(watermarkStrategy);
+        ProcessOperator<Tuple2<Long, String>, Tuple2<Long, String>> 
processOperator =
+                new ProcessOperator<>(processFunction);
+        OneInputStreamOperatorTestHarness<Tuple2<Long, String>, Tuple2<Long, 
String>> testHarness =
+                new OneInputStreamOperatorTestHarness<>(processOperator);
+        Set<WatermarkDeclaration> watermarkDeclarations =
+                (Set<WatermarkDeclaration>) 
processFunction.declareWatermarks();
+        byte[] serializedWatermarkDeclarations =
+                InstantiationUtil.serializeObject(
+                        WatermarkUtils.convertToInternalWatermarkDeclarations(
+                                watermarkDeclarations));
+        
testHarness.getStreamConfig().setWatermarkDeclarations(serializedWatermarkDeclarations);
+        testHarness.open();
+        return testHarness;
+    }
+
+    private void checkOutputEventTimeWatermarks(
+            ConcurrentLinkedQueue<Object> output, Long... 
expectedEventTimeWatermarks) {
+        List<Long> actualEventTimeWatermarks =
+                output.stream()
+                        .filter(
+                                object ->
+                                        object instanceof WatermarkEvent
+                                                && 
EventTimeExtension.isEventTimeWatermark(
+                                                        ((WatermarkEvent) 
object).getWatermark()))
+                        .map(
+                                watermarkEvent ->
+                                        ((LongWatermark)
+                                                        ((WatermarkEvent) 
watermarkEvent)
+                                                                
.getWatermark())
+                                                .getValue())
+                        .collect(Collectors.toList());
+        
assertThat(actualEventTimeWatermarks).containsExactly(expectedEventTimeWatermarks);
+    }
+
+    private void checkOutputIdleStatusWatermarks(
+            ConcurrentLinkedQueue<Object> output, Boolean... 
expectedIdleStatusWatermarks) {
+        List<Boolean> actualEventTimeWatermarks =
+                output.stream()
+                        .filter(
+                                object ->
+                                        object instanceof WatermarkEvent
+                                                && 
EventTimeExtension.isIdleStatusWatermark(
+                                                        ((WatermarkEvent) 
object).getWatermark()))
+                        .map(
+                                watermarkEvent ->
+                                        ((BoolWatermark)
+                                                        ((WatermarkEvent) 
watermarkEvent)
+                                                                
.getWatermark())
+                                                .getValue())
+                        .collect(Collectors.toList());
+        
assertThat(actualEventTimeWatermarks).containsExactly(expectedIdleStatusWatermarks);
+    }
+}


Reply via email to