xuyangzhong commented on code in PR #26649: URL: https://github.com/apache/flink/pull/26649#discussion_r2149531552
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/AsyncDeltaJoinRunner.java: ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.deltajoin; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.collector.TableFunctionResultFuture; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.runtime.generated.GeneratedResultFuture; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** The async join runner to look up the dimension table in delta join. */ +public class AsyncDeltaJoinRunner extends RichAsyncFunction<RowData, RowData> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDeltaJoinRunner.class); + + private static final long serialVersionUID = 1L; + + private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher; + private final DataStructureConverter<RowData, Object> fetcherConverter; + private final GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture; + private final int asyncBufferCapacity; + + private transient AsyncFunction<RowData, Object> fetcher; + + protected final RowDataSerializer lookupSideRowSerializer; + + private final boolean treatRightAsLookupTable; + + /** + * Buffers {@link ResultFuture} to avoid newInstance cost when processing elements every time. + * We use {@link BlockingQueue} to make sure the head {@link ResultFuture}s are available. + */ + private transient BlockingQueue<JoinedRowResultFuture> resultFutureBuffer; + + /** + * A Collection contains all ResultFutures in the runner which is used to invoke {@code close()} + * on every ResultFuture. {@link #resultFutureBuffer} may not contain all the ResultFutures + * because ResultFutures will be polled from the buffer when processing. + */ + private transient List<JoinedRowResultFuture> allResultFutures; + + // metrics + private long callAsyncFetchCostTime = 0L; Review Comment: nit: add `transient` ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java: ########## @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.deltajoin; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.TableAbstractCoUdfStreamOperator; +import org.apache.flink.table.runtime.operators.join.lookup.keyordered.AecRecord; +import org.apache.flink.table.runtime.operators.join.lookup.keyordered.TableAsyncExecutionController; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** This operator supports delta join in table layer. */ +public class StreamingDeltaJoinOperator + extends TableAbstractCoUdfStreamOperator< + RowData, AsyncDeltaJoinRunner, AsyncDeltaJoinRunner> + implements TwoInputStreamOperator<RowData, RowData, RowData>, BoundedMultiInput { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingDeltaJoinOperator.class); + + private static final long serialVersionUID = 1L; + + private static final int LEFT_INPUT_INDEX = 0; + private static final int RIGHT_INPUT_INDEX = 1; + + private static final String STATE_NAME = "_delta_join_operator_async_wait_state_"; + + private final StreamRecord<RowData> leftEmptyStreamRecord; + private final StreamRecord<RowData> rightEmptyStreamRecord; + + /** Selector to get join key from left input. */ + private final RowDataKeySelector leftJoinKeySelector; + + /** Selector to get join key from right input. */ + private final RowDataKeySelector rightJoinKeySelector; + + /** Timeout for the async collectors. */ + private final long timeout; + + /** Max number of inflight invocation. */ + private final int capacity; + + private transient boolean needDeepCopy; + + /** {@link TypeSerializer} for left side inputs while making snapshots. */ + private transient StreamElementSerializer<RowData> leftStreamElementSerializer; + + /** {@link TypeSerializer} for right side inputs while making snapshots. */ + private transient StreamElementSerializer<RowData> rightStreamElementSerializer; + + private transient TimestampedCollector<RowData> timestampedCollector; + + /** + * Recovered input stream elements backed by keyed state. + * + * <p>{@code <LeftElementRecord, RightElementRecord, Watermark, InputIndex>}. + */ + private transient ListState<Tuple4<StreamElement, StreamElement, StreamElement, Integer>> + recoveredStreamElements; + + /** Structure to control the process order of input records. */ + private transient TableAsyncExecutionController<RowData, RowData, RowData> + asyncExecutionController; + + /** Mailbox executor used to yield while waiting for buffers to empty. */ + private final transient MailboxExecutor mailboxExecutor; + + private final transient AtomicInteger totalInflightNum; + + private final boolean[] isInputEnded; + + private final transient AtomicLong asyncIOTime = new AtomicLong(Long.MIN_VALUE); + + /** + * Buffers {@link ReusableKeyedResultHandler} to avoid newInstance cost when processing elements + * every time. We use {@link BlockingQueue} to make sure the head {@link + * ReusableKeyedResultHandler}s are available. + */ + private transient BlockingQueue<ReusableKeyedResultHandler> resultHandlerBuffer; + + /** + * A Collection contains all KeyedResultHandlers in the runner which is used to invoke {@code + * close()} on every ResultFuture. {@link #resultHandlerBuffer} may not contain all the + * ResultFutures because ResultFutures will be polled from the buffer when processing. + */ + private transient List<ReusableKeyedResultHandler> allResultHandlers; + + public StreamingDeltaJoinOperator( + AsyncDeltaJoinRunner rightAsyncFunction, + AsyncDeltaJoinRunner leftAsyncFunction, + RowDataKeySelector leftJoinKeySelector, + RowDataKeySelector rightJoinKeySelector, + long timeout, + int capacity, + ProcessingTimeService processingTimeService, + MailboxExecutor mailboxExecutor, + RowType leftStreamType, + RowType rightStreamType) { + // rightAsyncFunction is an udx used for left records + // leftAsyncFunction is an udx used for right records + super(rightAsyncFunction, leftAsyncFunction); + this.leftJoinKeySelector = leftJoinKeySelector; + this.rightJoinKeySelector = rightJoinKeySelector; + this.timeout = timeout; + this.capacity = capacity; + this.processingTimeService = checkNotNull(processingTimeService); + this.mailboxExecutor = mailboxExecutor; + this.totalInflightNum = new AtomicInteger(0); + this.isInputEnded = new boolean[2]; + this.leftEmptyStreamRecord = + new StreamRecord<>(new GenericRowData(leftStreamType.getFieldCount())); + this.rightEmptyStreamRecord = + new StreamRecord<>(new GenericRowData(rightStreamType.getFieldCount())); + } + + @Override + public void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<RowData>> output) { + super.setup(containingTask, config, output); + + this.leftStreamElementSerializer = + new StreamElementSerializer<>( + getOperatorConfig().getTypeSerializerIn(0, getUserCodeClassloader())); + this.rightStreamElementSerializer = + new StreamElementSerializer<>( + getOperatorConfig().getTypeSerializerIn(1, getUserCodeClassloader())); + + this.timestampedCollector = new TimestampedCollector<>(super.output); + this.asyncExecutionController = + new TableAsyncExecutionController<>( + this::invoke, + this::emitWatermark, + entry -> { + entry.emitResult(timestampedCollector); + totalInflightNum.decrementAndGet(); + }, + (entry) -> { + checkState( + entry instanceof InputIndexAwareStreamRecordQueueEntry, + "The entry type is " + entry.getClass().getSimpleName()); + return ((InputIndexAwareStreamRecordQueueEntry) entry).inputIndex; + }, + (record, inputIndex) -> { + RowDataKeySelector keySelector = + isLeft(inputIndex) ? leftJoinKeySelector : rightJoinKeySelector; + return keySelector.getKey(record.getValue()); + }); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + TypeInformation<Tuple4<StreamElement, StreamElement, StreamElement, Integer>> + stateTypeInfo = + Types.TUPLE( + TypeInformation.of(StreamElement.class), + TypeInformation.of(StreamElement.class), + TypeInformation.of(StreamElement.class), + BasicTypeInfo.INT_TYPE_INFO); + + Class<Tuple4<StreamElement, StreamElement, StreamElement, Integer>> type = + stateTypeInfo.getTypeClass(); + + TypeSerializer<?>[] stateElementSerializers = + new TypeSerializer[] { + leftStreamElementSerializer, + rightStreamElementSerializer, + leftStreamElementSerializer, // watermark + IntSerializer.INSTANCE + }; + TupleSerializer<Tuple4<StreamElement, StreamElement, StreamElement, Integer>> + stateSerializer = new TupleSerializer<>(type, stateElementSerializers); + recoveredStreamElements = + context.getKeyedStateStore() + .getListState(new ListStateDescriptor<>(STATE_NAME, stateSerializer)); + } + + @Override + public void open() throws Exception { + super.open(); + + this.needDeepCopy = getExecutionConfig().isObjectReuseEnabled() && !config.isChainStart(); + + // register metrics + // 1. aec metrics + asyncExecutionController.registerMetrics(getRuntimeContext().getMetricGroup()); + // 2. delta-join operator metrics + getRuntimeContext() + .getMetricGroup() + .gauge("delta_join_op_total_in_flight_num", totalInflightNum::get); + getRuntimeContext().getMetricGroup().gauge("delta_join_async_io_time", asyncIOTime::get); + + // asyncBufferCapacity + 1 as the queue size in order to avoid + // blocking on the queue when taking a collector. + this.resultHandlerBuffer = new ArrayBlockingQueue<>(capacity + 1); + this.allResultHandlers = new ArrayList<>(capacity + 1); + for (int i = 0; i < capacity + 1; i++) { + ReusableKeyedResultHandler reusableKeyedResultHandler = + new ReusableKeyedResultHandler(resultHandlerBuffer); + // add will throw exception immediately if the queue is full which should never happen + resultHandlerBuffer.add(reusableKeyedResultHandler); + allResultHandlers.add(reusableKeyedResultHandler); + } + + List<Object> keys = getAllStateKeys(); + for (Object key : keys) { + setCurrentKey(key); + triggerRecoveryProcess(); + } + } + + @Override + public void processElement1(StreamRecord<RowData> element) throws Exception { + processElement(element, LEFT_INPUT_INDEX); + } + + @Override + public void processElement2(StreamRecord<RowData> element) throws Exception { + processElement(element, RIGHT_INPUT_INDEX); + } + + public void emitWatermark(Watermark mark) { + // place the action of emission in mailbox instead do it right now + timestampedCollector.emitWatermark(mark); + } + + public void invoke(AecRecord<RowData, RowData> element) throws Exception { + final ReusableKeyedResultHandler resultHandler = resultHandlerBuffer.take(); + resultHandler.reset(element); + boolean isLeft = isLeft(element.getInputIndex()); + // register a timeout for the entry if timeout is configured + if (timeout > 0L) { + resultHandler.registerTimeout(getProcessingTimeService(), timeout, isLeft); + } + if (isLeft) { + leftUserFunction.asyncInvoke(element.getRecord().getValue(), resultHandler); + } else { + rightUserFunction.asyncInvoke(element.getRecord().getValue(), resultHandler); + } + } + + private void tryProcess() throws Exception { + while (totalInflightNum.get() >= capacity) { + LOG.debug( + "Failed to put element into asyncExecutionController because totalInflightNum is greater or equal to " + + "maxInflight ({}/{}).", + totalInflightNum.get(), + capacity); + mailboxExecutor.yield(); + } + totalInflightNum.incrementAndGet(); + } + + private void processElement(StreamRecord<RowData> element, int inputIndex) throws Exception { Review Comment: nit: add `Preconditions` to verify all input data are with `RowKind#INSERT` ########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java: ########## @@ -0,0 +1,834 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.deltajoin; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.runtime.collector.TableFunctionCollector; +import org.apache.flink.table.runtime.collector.TableFunctionResultFuture; +import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper; +import org.apache.flink.table.runtime.generated.GeneratedResultFutureWrapper; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.lookup.keyordered.AecRecord; +import org.apache.flink.table.runtime.operators.join.lookup.keyordered.RecordsBuffer; +import org.apache.flink.table.runtime.operators.join.lookup.keyordered.TableAsyncExecutionController; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test class for {@link StreamingDeltaJoinOperator}. */ +public class StreamingDeltaJoinOperatorTest { Review Comment: Adapt this test just same with DeltaJoinITCase ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/AsyncDeltaJoinRunner.java: ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.deltajoin; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.collector.TableFunctionResultFuture; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.runtime.generated.GeneratedResultFuture; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +/** The async join runner to look up the dimension table in delta join. */ +public class AsyncDeltaJoinRunner extends RichAsyncFunction<RowData, RowData> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDeltaJoinRunner.class); + + private static final long serialVersionUID = 1L; + + private final GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher; + private final DataStructureConverter<RowData, Object> fetcherConverter; + private final GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture; + private final int asyncBufferCapacity; + + private transient AsyncFunction<RowData, Object> fetcher; + + protected final RowDataSerializer lookupSideRowSerializer; + + private final boolean treatRightAsLookupTable; + + /** + * Buffers {@link ResultFuture} to avoid newInstance cost when processing elements every time. + * We use {@link BlockingQueue} to make sure the head {@link ResultFuture}s are available. + */ + private transient BlockingQueue<JoinedRowResultFuture> resultFutureBuffer; + + /** + * A Collection contains all ResultFutures in the runner which is used to invoke {@code close()} + * on every ResultFuture. {@link #resultFutureBuffer} may not contain all the ResultFutures + * because ResultFutures will be polled from the buffer when processing. + */ + private transient List<JoinedRowResultFuture> allResultFutures; + + // metrics + private long callAsyncFetchCostTime = 0L; + + public AsyncDeltaJoinRunner( + GeneratedFunction<AsyncFunction<RowData, Object>> generatedFetcher, + DataStructureConverter<RowData, Object> fetcherConverter, + GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture, + RowDataSerializer lookupSideRowSerializer, + int asyncBufferCapacity, + boolean treatRightAsLookupTable) { + this.generatedFetcher = generatedFetcher; + this.fetcherConverter = fetcherConverter; + this.generatedResultFuture = generatedResultFuture; + this.lookupSideRowSerializer = lookupSideRowSerializer; + this.asyncBufferCapacity = asyncBufferCapacity; + this.treatRightAsLookupTable = treatRightAsLookupTable; + } + + @Override + public void open(OpenContext openContext) throws Exception { + super.open(openContext); + + this.fetcher = generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader()); + FunctionUtils.setFunctionRuntimeContext(fetcher, getRuntimeContext()); + FunctionUtils.openFunction(fetcher, openContext); + + // try to compile the generated ResultFuture, fail fast if the code is corrupt. + generatedResultFuture.compile(getRuntimeContext().getUserCodeClassLoader()); + + fetcherConverter.open(getRuntimeContext().getUserCodeClassLoader()); + + // asyncBufferCapacity + 1 as the queue size in order to avoid + // blocking on the queue when taking a collector. + this.resultFutureBuffer = new ArrayBlockingQueue<>(asyncBufferCapacity + 1); + this.allResultFutures = new ArrayList<>(); + LOG.info( + "Begin to initialize reusable result futures with size {}", + asyncBufferCapacity + 1); + for (int i = 0; i < asyncBufferCapacity + 1; i++) { + JoinedRowResultFuture rf = + new JoinedRowResultFuture( + resultFutureBuffer, + createFetcherResultFuture(openContext), + fetcherConverter, + treatRightAsLookupTable); + // add will throw exception immediately if the queue is full which should never happen + resultFutureBuffer.add(rf); + allResultFutures.add(rf); + } + LOG.info("Finish initializing reusable result futures"); + + getRuntimeContext() + .getMetricGroup() + .gauge( + treatRightAsLookupTable + ? "delta_join_left_call_async_fetch_cost_time" Review Comment: ditto, use static final constant string fields. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org