reswqa commented on code in PR #25978:
URL: https://github.com/apache/flink/pull/25978#discussion_r1914624743
##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/attribute/StreamingJobGraphGeneratorWithAttributeTest.java:
##########
@@ -212,7 +212,7 @@ private static class NoOutputUntilEndOfInputMapTask
@Override
public void processRecord(
- Integer record, Collector<Integer> output, PartitionedContext
ctx) {
+ Integer record, Collector<Integer> output,
PartitionedContext<Integer> ctx) {
Review Comment:
`[hotfix] Add generics to places where PartitionedContext is used` ->
`[hotfix] Add missing generic type param for the use of PartitionedContext`
##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/EventTimeExtension.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.watermark.BoolWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
+import org.apache.flink.api.common.watermark.Watermark;
+import org.apache.flink.api.common.watermark.WatermarkDeclarations;
+import
org.apache.flink.datastream.api.extension.eventtime.function.EventTimeProcessFunction;
+import
org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputBroadcastEventTimeStreamProcessFunction;
+import
org.apache.flink.datastream.api.extension.eventtime.function.TwoInputNonBroadcastEventTimeStreamProcessFunction;
+import
org.apache.flink.datastream.api.extension.eventtime.function.TwoOutputEventTimeStreamProcessFunction;
+import
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor;
+import
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder;
+import
org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkStrategy;
+import
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/**
+ * The entry point for the event-time extension, which provides the following
functionality:
+ *
+ * <ul>
+ * <li>defines the event-time watermark.
+ * <li>provides the {@link EventTimeWatermarkGeneratorBuilder} to facilitate
the generation of
+ * event time watermarks.
+ * <li>provides a tool to encapsulate a user-defined {@link
EventTimeProcessFunction} to provide
+ * the relevant components of the event-time extension.
+ * </ul>
+ */
+@Experimental
+public class EventTimeExtension {
Review Comment:
This is the core api for event-time extension. We have to add more
user-friendly java doc for every public field and method.
##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkStrategy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * This class represents how and when to extract event time and watermarks in
the Event Time
+ * Extension.
+ */
Review Comment:
```suggestion
/**
* Component which encapsulates the logic of how and when to extract event
time and watermarks.
*/
```
##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/timer/EventTimeManager.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.timer;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/**
+ * This utility class allows users to register and delete event timers, as
well as retrieve event
+ * times. Note that registering event timers can only be used with {@link
KeyedPartitionStream}.
Review Comment:
This class is responsible for managing stuff related to event-time/timer.
For example, register and delete event timers, as well as retrieve event time.
Note that methods for timer can only be used in {@link KeyedPartitionStream}.
##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+
+/** The {@link OneInputStreamProcessFunction} that extends with event time
support. */
+@Experimental
+public interface OneInputEventTimeStreamProcessFunction<IN, OUT>
+ extends EventTimeProcessFunction, OneInputStreamProcessFunction<IN,
OUT> {
+
+ default void onEventTimeWatermark(
Review Comment:
java doc.
ditto for other XXXEventTimeStreamProcessFunction
##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/strategy/EventTimeWatermarkGeneratorBuilder.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.strategy;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+
+import java.time.Duration;
+
+/**
+ * A utility class for constructing a processing function that extracts event
time and generates
+ * event time watermarks in the {@link EventTimeExtension}.
+ */
+@Experimental
+public class EventTimeWatermarkGeneratorBuilder<T> {
+
+ private final EventTimeWatermarkStrategy<T> strategy;
Review Comment:
`EventTimeWatermarkStrategy` should be immutable.
I suggest we move the building logic into this class.
##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/EventTimeProcessFunction.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.function;
+
+import org.apache.flink.annotation.Experimental;
+import
org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.ProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+
+/**
+ * The base interface for event time process functions, indicating that the
process function will
+ * use event time extensions, such as registering event timers and handle
event time watermarks.
+ * Note that user-defined process functions should implement this
sub-interface rather than this
+ * interface.
Review Comment:
```suggestion
/**
* The base interface for event time processing, indicating that the {@link
ProcessFunction} will be
* enriched with event time processing functions, such as registering event
timers and handle event time watermarks.
```
##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/eventtime/function/OneInputEventTimeStreamProcessFunction.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.eventtime.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+
+/** The {@link OneInputStreamProcessFunction} that extends with event time
support. */
+@Experimental
+public interface OneInputEventTimeStreamProcessFunction<IN, OUT>
+ extends EventTimeProcessFunction, OneInputStreamProcessFunction<IN,
OUT> {
+
+ default void onEventTimeWatermark(
+ long watermarkTimestamp, Collector<OUT> output,
NonPartitionedContext<OUT> ctx)
+ throws Exception {}
+
+ default void onEventTimer(long timestamp, Collector<OUT> output,
PartitionedContext<OUT> ctx) {}
Review Comment:
java doc.
ditto for other XXXEventTimeStreamProcessFunction
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]