Zakelly commented on code in PR #26328:
URL: https://github.com/apache/flink/pull/26328#discussion_r2024470829


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/functions/DeclaringAsyncKeyedCoProcessFunction.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * A function that processes elements of two keyed streams and produces a 
single output stream.
+ *
+ * <p>The function will be called for every element in the input streams and 
can produce zero or
+ * more output elements. Contrary to the {@link CoFlatMapFunction}, this 
function can also query the
+ * time (both event and processing) and set timers, through the provided 
{@link Context}. When
+ * reacting to the firing of timers the function can emit yet more elements.
+ *
+ * <p>An example use case for connected streams is the application of a set of 
rules that change
+ * over time ({@code stream A}) to the elements contained in another stream 
(stream {@code B}). The
+ * rules contained in {@code stream A} can be stored in the state and wait for 
new elements to
+ * arrive on {@code stream B}. Upon reception of a new element on {@code 
stream B}, the function can
+ * apply the previously stored rules to the element and emit a result, and/or 
register a timer that
+ * will trigger an action in the future.
+ *
+ * @param <K> Type of the key.
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@Internal
+public abstract class DeclaringAsyncKeyedCoProcessFunction<K, IN1, IN2, OUT>
+        extends KeyedCoProcessFunction<K, IN1, IN2, OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Override this method or use {@link #declareProcess1} instead. */
+    @Override
+    public void processElement1(IN1 value, Context ctx, Collector<OUT> out) 
throws Exception {
+        throw new IllegalAccessException("This method is replaced by 
declareProcess1.");
+    }
+
+    /** Override this method or use {@link #declareProcess2} instead. */

Review Comment:
   Actually we dont recommend to use this method. How about rewrite this 
javadoc?
   ```suggestion
       /** Override and finalize this method. Please use {@link 
#declareProcess2} instead. */
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java:
##########
@@ -387,8 +387,12 @@ public Watermark preProcessWatermark(Watermark watermark) 
throws Exception {
      * perform async state here. Only some synchronous logic is suggested.
      *
      * @param watermark the advanced watermark.
+     * @return the watermark that should be emitted to downstream. Null if 
there is no need for
+     *     following emitting.
      */
-    public void postProcessWatermark(Watermark watermark) throws Exception {}
+    public Watermark postProcessWatermark(Watermark watermark) throws 
Exception {

Review Comment:
   How about change the javadoc of this method to :
   
   A hook that will be invoked after finishing advancing the watermark and 
right before the watermark being emitting downstream. Here is a chance for 
customization of the emitting watermark. ....(and following description)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/co/AsyncIntervalJoinOperator.java:
##########
@@ -417,12 +419,12 @@ public <X> void output(OutputTag<X> outputTag, X value) {
     }
 
     @VisibleForTesting
-    MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() 
{
+    public MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> 
getLeftBuffer() {

Review Comment:
   Seems unnecessary?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java:
##########
@@ -367,12 +368,18 @@ private TwoInputTransformation<RowData, RowData, RowData> 
createProcTimeJoin(
                         leftTypeInfo,
                         rightTypeInfo,
                         joinFunction);
-
+        TwoInputStreamOperator<RowData, RowData, RowData> operator;
+        if (config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED)) 
{
+            // TODO: add async version procJoinFunc to use 
AsyncKeyedCoProcessOperator
+            operator = new KeyedCoProcessOperator<>(procJoinFunc);
+        } else {
+            operator = new KeyedCoProcessOperator<>(procJoinFunc);
+        }

Review Comment:
   We could revert this part until we start to implement the SQL operator



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java:
##########
@@ -523,4 +529,14 @@ private <R> SingleOutputStreamOperator<R> doTransform(
 
         return returnStream;
     }
+
+    private boolean isEnableAsyncState() {
+        boolean enableAsyncState = false;
+        if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof 
KeyedStream)) {
+            enableAsyncState =
+                    ((KeyedStream<?, ?>) inputStream1).isEnableAsyncState()
+                            && ((KeyedStream<?, ?>) 
inputStream2).isEnableAsyncState();
+        }
+        return enableAsyncState;
+    }

Review Comment:
   I'd suggest we keep a member `boolean isEnableAsyncState`  here. Thus user 
could perform `enableAsyncState()` on `ConnectedStreams`, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to