Zakelly commented on code in PR #26698: URL: https://github.com/apache/flink/pull/26698#discussion_r2158135519
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncKeyOrderedLookupOperator.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This operator serves a similar purpose to {@link AsyncWaitOperator}. Unlike {@link + * AsyncWaitOperator}, this operator supports key-ordered async processing. + * + * <p>If the planner can infer the upsert key, then the order key used for processing will be the + * upsert key; otherwise, the entire row will be treated as the order key. + * + * @param <IN> Input type for the operator. + * @param <OUT> Output type for the operator. + * @param <KEY> Key type for the operator. + */ +public class AsyncKeyOrderedLookupOperator<IN, OUT, KEY> + extends AsyncUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> + implements OneInputStreamOperator<IN, OUT>, BoundedOneInput { + + /** Timeout for the async collectors. */ + private final long timeout; + + private transient TimestampedCollector<OUT> timestampedCollector; + + /** Number of inputs which is invoked for lookup but do not output until now. */ + private final transient AtomicInteger totalInflightNum; + + public AsyncKeyOrderedLookupOperator( + AsyncFunction<IN, OUT> asyncFunction, + KeySelector<IN, KEY> keySelector, + ExecutorService asyncThreadPool, + int asyncBufferSize, + long asyncBufferTimeout, + int inFlightRecordsLimit, + long timeout, + ProcessingTimeService processingTimeService) { + super( + asyncFunction, + keySelector, + null, + asyncThreadPool, + asyncBufferSize, + asyncBufferTimeout, + inFlightRecordsLimit); + this.timeout = timeout; + this.totalInflightNum = new AtomicInteger(0); + this.processingTimeService = Preconditions.checkNotNull(processingTimeService); + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<OUT>> output) { + super.setup(containingTask, config, output); + this.timestampedCollector = new TimestampedCollector<>(super.output); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + } + + @Override + public void open() throws Exception { + super.open(); + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + totalInflightNum.incrementAndGet(); + asyncProcess( + // Once this runnable returned, the ref of corresponding count decrement to zero + // which would trigger dispose the context in AsyncExecutionController + // This part is executed in the AsyncExecutor + () -> { + KeyedResultHandler handler = invoke(element); + handler.waitUntilOutput(); + return null; + }); + } + + @Override + public void endInput() throws Exception { + asyncExecutionController.drainInflightRecords(0); + waitAllInFlightInputsFinished(); + } + + @Override + protected KeySelector<?, ?> getKeySelectorForAsyncKeyedContext(int index) { + // This operator is OneInputStreamOperator + return keySelector1; + } + + public KeyedResultHandler invoke(StreamRecord<IN> element) throws Exception { + final KeyedResultHandler resultHandler = + new KeyedResultHandler(new StreamRecordQueueEntry<>(element)); + // register a timeout for the entry if timeout is configured + if (timeout > 0L) { + resultHandler.registerTimeout(getProcessingTimeService(), element, timeout); + } + userFunction.asyncInvoke(element.getValue(), resultHandler); + return resultHandler; + } + + public void waitAllInFlightInputsFinished() { Review Comment: maybe `asyncExecutionController.drainInflightRecords(0);` is enough. I'd suggest not using `waitUntil` since it mainly used for waiting specified requests. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/AsyncKeyOrderedLookupOperator.java: ########## @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; +import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This operator serves a similar purpose to {@link AsyncWaitOperator}. Unlike {@link + * AsyncWaitOperator}, this operator supports key-ordered async processing. + * + * <p>If the planner can infer the upsert key, then the order key used for processing will be the + * upsert key; otherwise, the entire row will be treated as the order key. + * + * @param <IN> Input type for the operator. + * @param <OUT> Output type for the operator. + * @param <KEY> Key type for the operator. + */ +public class AsyncKeyOrderedLookupOperator<IN, OUT, KEY> + extends AsyncUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> + implements OneInputStreamOperator<IN, OUT>, BoundedOneInput { + + /** Timeout for the async collectors. */ + private final long timeout; + + private transient TimestampedCollector<OUT> timestampedCollector; + + /** Number of inputs which is invoked for lookup but do not output until now. */ + private final transient AtomicInteger totalInflightNum; + + public AsyncKeyOrderedLookupOperator( + AsyncFunction<IN, OUT> asyncFunction, + KeySelector<IN, KEY> keySelector, + ExecutorService asyncThreadPool, + int asyncBufferSize, + long asyncBufferTimeout, + int inFlightRecordsLimit, + long timeout, + ProcessingTimeService processingTimeService) { + super( + asyncFunction, + keySelector, + null, + asyncThreadPool, + asyncBufferSize, + asyncBufferTimeout, + inFlightRecordsLimit); + this.timeout = timeout; + this.totalInflightNum = new AtomicInteger(0); + this.processingTimeService = Preconditions.checkNotNull(processingTimeService); + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<OUT>> output) { + super.setup(containingTask, config, output); + this.timestampedCollector = new TimestampedCollector<>(super.output); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + } + + @Override + public void open() throws Exception { + super.open(); + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + totalInflightNum.incrementAndGet(); + asyncProcess( + // Once this runnable returned, the ref of corresponding count decrement to zero + // which would trigger dispose the context in AsyncExecutionController + // This part is executed in the AsyncExecutor + () -> { + KeyedResultHandler handler = invoke(element); Review Comment: Ahhh..... I don't think this is the best practice. I'd suggest the `invoke` produce a `AsyncFuture`, by wrapping one and providing to `asyncFunction`. And do `thenXxxx` on the future here to chain the following action. The AEC will track all the chaining operation and keep it run in main thread. We'd better not do something like `KeyedResultHandler.processInMailbox` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org