reswqa commented on code in PR #25978:
URL: https://github.com/apache/flink/pull/25978#discussion_r1917894855


##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.EventTimeProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor;
+import 
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder;
+import 
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.ProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/**
+ * The entry point for the event-time extension, which provides the following 
functionality:
+ *
+ * <ul>
+ *   <li>defines the event-time watermark and idle status watermark. If you 
use the {@link
+ *       EventTimeWatermarkGeneratorBuilder} below, then you don't need to 
declare these watermarks
+ *       manually in your application; otherwise you need to declare them in 
your own {@link
+ *       ProcessFunction}.
+ *   <li>provides the {@link EventTimeWatermarkGeneratorBuilder} to facilitate 
the generation of
+ *       event time watermarks. An example of using {@link 
EventTimeWatermarkGeneratorBuilder} is as
+ *       follows:
+ *       <pre>{@code
+ * OneInputStreamProcessFunction<POJO, POJO> watermarkGeneratorProcessFunction
+ *       = EventTimeExtension
+ *       .newWatermarkGeneratorBuilder(POJO::getEventTime)
+ *       .periodicWatermark()
+ *       .buildAsProcessFunction();
+ * source.process(watermarkGeneratorProcessFunction)
+ *       .process(...)
+ * }</pre>
+ *   <li>provides a tool to encapsulate a user-defined {@link 
EventTimeProcessFunction} to provide
+ *       the relevant components of the event-time extension.
+ *       <pre>{@code
+ * stream.process(
+ *          EventTimeExtension.wrapProcessFunction(
+ *              new CustomEventTimeProcessFunction()
+ *          )
+ *       )
+ *       .process(...)
+ * }</pre>
+ * </ul>
+ */
+@Experimental
+public class EventTimeExtension {
+
+    // =============== Event Time related Watermark Declarations 
===============
+
+    /**
+     * Definition of EventTimeWatermark. The EventTimeWatermark represents a 
specific timestamp,
+     * signifying the passage of time. Once a process function receives an 
EventTimeWatermark, it
+     * will no longer receive events with a timestamp earlier than that 
watermark.
+     */
+    public static final LongWatermarkDeclaration 
EVENT_TIME_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("BUILTIN_API_EVENT_TIME")
+                    .typeLong()
+                    .combineFunctionMin()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    /**
+     * Definition of IdleStatusWatermark. The IdleStatusWatermark indicates 
that a particular input
+     * is in an idle state. When a ProcessFunction receives an 
IdleStatusWatermark from an input, it
+     * should ignore that input when combining EventTimeWatermarks.
+     */
+    public static final BoolWatermarkDeclaration 
IDLE_STATUS_WATERMARK_DECLARATION =
+            WatermarkDeclarations.newBuilder("BUILTIN_API_EVENT_TIME_IDLE")
+                    .typeBool()
+                    .combineFunctionAND()
+                    .combineWaitForAllChannels(true)
+                    .defaultHandlingStrategyForward()
+                    .build();
+
+    /** Determine if the received watermark is an EventTimeWatermark. */
+    public static boolean isEventTimeWatermark(Watermark watermark) {
+        return isEventTimeWatermark(watermark.getIdentifier());
+    }
+
+    /** Determine if the received watermark is an EventTimeWatermark by 
watermark identifier. */
+    public static boolean isEventTimeWatermark(String watermarkIdentifier) {
+        return 
watermarkIdentifier.equals(EVENT_TIME_WATERMARK_DECLARATION.getIdentifier());
+    }
+
+    /** Determine if the received watermark is an IdleStatusWatermark. */
+    public static boolean isIdleStatusWatermark(Watermark watermark) {
+        return isIdleStatusWatermark(watermark.getIdentifier());
+    }
+
+    /** Determine if the received watermark is an IdleStatusWatermark by 
watermark identifier. */
+    public static boolean isIdleStatusWatermark(String watermarkIdentifier) {
+        return 
watermarkIdentifier.equals(IDLE_STATUS_WATERMARK_DECLARATION.getIdentifier());
+    }
+
+    // ======== EventTimeWatermarkGeneratorBuilder to generate event time 
watermarks =========
+
+    /**
+     * Create an instance of {@link EventTimeWatermarkGeneratorBuilder}, which 
contains a {@code
+     * EventTimeExtractor}.
+     */
+    public static <T> EventTimeWatermarkGeneratorBuilder<T> 
newWatermarkGeneratorBuilder(
+            EventTimeExtractor<T> eventTimeExtractor) {
+        return new EventTimeWatermarkGeneratorBuilder<>(eventTimeExtractor);
+    }
+
+    // ======== Wrap user-defined {@link EventTimeProcessFunction} =========
+
+    /**

Review Comment:
   param and return value.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkStrategy.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** Component which encapsulates the logic of how and when to extract event 
time and watermarks. */
+@Experimental
+public class EventTimeWatermarkStrategy<T> implements Serializable {
+    // how to extract event time from event
+    private EventTimeExtractor<T> eventTimeExtractor;

Review Comment:
   can be final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to