Zakelly commented on code in PR #26698:
URL: https://github.com/apache/flink/pull/26698#discussion_r2158135519


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncKeyOrderedLookupOperator.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import 
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This operator serves a similar purpose to {@link AsyncWaitOperator}. Unlike 
{@link
+ * AsyncWaitOperator}, this operator supports key-ordered async processing.
+ *
+ * <p>If the planner can infer the upsert key, then the order key used for 
processing will be the
+ * upsert key; otherwise, the entire row will be treated as the order key.
+ *
+ * @param <IN> Input type for the operator.
+ * @param <OUT> Output type for the operator.
+ * @param <KEY> Key type for the operator.
+ */
+public class AsyncKeyOrderedLookupOperator<IN, OUT, KEY>
+        extends AsyncUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
+        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
+
+    /** Timeout for the async collectors. */
+    private final long timeout;
+
+    private transient TimestampedCollector<OUT> timestampedCollector;
+
+    /** Number of inputs which is invoked for lookup but do not output until 
now. */
+    private final transient AtomicInteger totalInflightNum;
+
+    public AsyncKeyOrderedLookupOperator(
+            AsyncFunction<IN, OUT> asyncFunction,
+            KeySelector<IN, KEY> keySelector,
+            ExecutorService asyncThreadPool,
+            int asyncBufferSize,
+            long asyncBufferTimeout,
+            int inFlightRecordsLimit,
+            long timeout,
+            ProcessingTimeService processingTimeService) {
+        super(
+                asyncFunction,
+                keySelector,
+                null,
+                asyncThreadPool,
+                asyncBufferSize,
+                asyncBufferTimeout,
+                inFlightRecordsLimit);
+        this.timeout = timeout;
+        this.totalInflightNum = new AtomicInteger(0);
+        this.processingTimeService = 
Preconditions.checkNotNull(processingTimeService);
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<OUT>> output) {
+        super.setup(containingTask, config, output);
+        this.timestampedCollector = new TimestampedCollector<>(super.output);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+    }
+
+    @Override
+    public void processElement(StreamRecord<IN> element) throws Exception {
+        totalInflightNum.incrementAndGet();
+        asyncProcess(
+                // Once this runnable returned, the ref of corresponding count 
decrement to zero
+                // which would trigger dispose the context in 
AsyncExecutionController
+                // This part is executed in the AsyncExecutor
+                () -> {
+                    KeyedResultHandler handler = invoke(element);
+                    handler.waitUntilOutput();
+                    return null;
+                });
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        asyncExecutionController.drainInflightRecords(0);
+        waitAllInFlightInputsFinished();
+    }
+
+    @Override
+    protected KeySelector<?, ?> getKeySelectorForAsyncKeyedContext(int index) {
+        // This operator is OneInputStreamOperator
+        return keySelector1;
+    }
+
+    public KeyedResultHandler invoke(StreamRecord<IN> element) throws 
Exception {
+        final KeyedResultHandler resultHandler =
+                new KeyedResultHandler(new StreamRecordQueueEntry<>(element));
+        // register a timeout for the entry if timeout is configured
+        if (timeout > 0L) {
+            resultHandler.registerTimeout(getProcessingTimeService(), element, 
timeout);
+        }
+        userFunction.asyncInvoke(element.getValue(), resultHandler);
+        return resultHandler;
+    }
+
+    public void waitAllInFlightInputsFinished() {

Review Comment:
   maybe `asyncExecutionController.drainInflightRecords(0);` is enough. I'd 
suggest not using `waitUntil` since it mainly used for waiting specified 
requests.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncKeyOrderedLookupOperator.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
+import 
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This operator serves a similar purpose to {@link AsyncWaitOperator}. Unlike 
{@link
+ * AsyncWaitOperator}, this operator supports key-ordered async processing.
+ *
+ * <p>If the planner can infer the upsert key, then the order key used for 
processing will be the
+ * upsert key; otherwise, the entire row will be treated as the order key.
+ *
+ * @param <IN> Input type for the operator.
+ * @param <OUT> Output type for the operator.
+ * @param <KEY> Key type for the operator.
+ */
+public class AsyncKeyOrderedLookupOperator<IN, OUT, KEY>
+        extends AsyncUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
+        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
+
+    /** Timeout for the async collectors. */
+    private final long timeout;
+
+    private transient TimestampedCollector<OUT> timestampedCollector;
+
+    /** Number of inputs which is invoked for lookup but do not output until 
now. */
+    private final transient AtomicInteger totalInflightNum;
+
+    public AsyncKeyOrderedLookupOperator(
+            AsyncFunction<IN, OUT> asyncFunction,
+            KeySelector<IN, KEY> keySelector,
+            ExecutorService asyncThreadPool,
+            int asyncBufferSize,
+            long asyncBufferTimeout,
+            int inFlightRecordsLimit,
+            long timeout,
+            ProcessingTimeService processingTimeService) {
+        super(
+                asyncFunction,
+                keySelector,
+                null,
+                asyncThreadPool,
+                asyncBufferSize,
+                asyncBufferTimeout,
+                inFlightRecordsLimit);
+        this.timeout = timeout;
+        this.totalInflightNum = new AtomicInteger(0);
+        this.processingTimeService = 
Preconditions.checkNotNull(processingTimeService);
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<OUT>> output) {
+        super.setup(containingTask, config, output);
+        this.timestampedCollector = new TimestampedCollector<>(super.output);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+    }
+
+    @Override
+    public void processElement(StreamRecord<IN> element) throws Exception {
+        totalInflightNum.incrementAndGet();
+        asyncProcess(
+                // Once this runnable returned, the ref of corresponding count 
decrement to zero
+                // which would trigger dispose the context in 
AsyncExecutionController
+                // This part is executed in the AsyncExecutor
+                () -> {
+                    KeyedResultHandler handler = invoke(element);

Review Comment:
   Ahhh..... I don't think this is the best practice.
   
   I'd suggest the `invoke` produce a `AsyncFuture`, by wrapping one and 
providing to `asyncFunction`. And do `thenXxxx` on the future here to chain the 
following action. The AEC will track all the chaining operation and keep it run 
in main thread. We'd better not do something like 
`KeyedResultHandler.processInMailbox`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to