This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 52155584e17 [FLINK-38528][table] Introduce async vector search
operator (#27126)
52155584e17 is described below
commit 52155584e17a33ee6ea0de3eb97dc4568d9c8173
Author: Shengkai <[email protected]>
AuthorDate: Wed Oct 22 17:14:42 2025 +0800
[FLINK-38528][table] Introduce async vector search operator (#27126)
---
.../StreamExecVectorSearchTableFunction.java | 70 ++++++-
.../codegen/VectorSearchCodeGenerator.scala | 28 +++
.../factories/TestValuesRuntimeFunctions.java | 50 +++++
.../planner/factories/TestValuesTableFactory.java | 91 +++++++-
.../stream/table/AsyncVectorSearchITCase.java | 232 +++++++++++++++++++++
.../operators/search/AsyncVectorSearchRunner.java | 147 +++++++++++++
6 files changed, 606 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
index 9dcb90b2473..c0072e67f9e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecVectorSearchTableFunction.java
@@ -23,19 +23,23 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.connector.source.VectorSearchTableSource;
import
org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
import
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.functions.VectorSearchFunction;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.FunctionCallCodeGenerator;
import org.apache.flink.table.planner.codegen.VectorSearchCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
@@ -55,9 +59,11 @@ import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.generated.GeneratedCollector;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.operators.search.AsyncVectorSearchRunner;
import org.apache.flink.table.runtime.operators.search.VectorSearchRunner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.core.JoinRelType;
@@ -116,17 +122,27 @@ public class StreamExecVectorSearchTableFunction extends
ExecNodeBase<RowData>
// 3. build the operator
RowType inputType = (RowType) inputEdge.getOutputType();
RowType outputType = (RowType) getOutputType();
+ DataTypeFactory dataTypeFactory =
+ ShortcutUtils.unwrapContext(planner.getFlinkContext())
+ .getCatalogManager()
+ .getDataTypeFactory();
StreamOperatorFactory<RowData> operatorFactory =
isAsyncEnabled
- ? createAsyncVectorSearchOperator()
+ ? createAsyncVectorSearchOperator(
+ searchTable,
+ config,
+ planner.getFlinkContext().getClassLoader(),
+ (AsyncVectorSearchFunction)
vectorSearchFunction,
+ dataTypeFactory,
+ inputType,
+ vectorSearchSpec.getOutputType(),
+ outputType)
: createSyncVectorSearchOperator(
searchTable,
config,
planner.getFlinkContext().getClassLoader(),
(VectorSearchFunction) vectorSearchFunction,
-
ShortcutUtils.unwrapContext(planner.getFlinkContext())
- .getCatalogManager()
- .getDataTypeFactory(),
+ dataTypeFactory,
inputType,
vectorSearchSpec.getOutputType(),
outputType);
@@ -225,7 +241,49 @@ public class StreamExecVectorSearchTableFunction extends
ExecNodeBase<RowData>
searchOutputType.getFieldCount());
}
- private SimpleOperatorFactory<RowData> createAsyncVectorSearchOperator() {
- throw new UnsupportedOperationException("Async vector search is not
supported yet.");
+ @SuppressWarnings("unchecked")
+ private StreamOperatorFactory<RowData> createAsyncVectorSearchOperator(
+ RelOptTable searchTable,
+ ExecNodeConfig config,
+ ClassLoader jobClassLoader,
+ AsyncVectorSearchFunction vectorSearchFunction,
+ DataTypeFactory dataTypeFactory,
+ RowType inputType,
+ RowType searchOutputType,
+ RowType outputType) {
+ ArrayList<FunctionCallUtil.FunctionParam> parameters =
+ new ArrayList<>(1 +
vectorSearchSpec.getSearchColumns().size());
+ parameters.add(vectorSearchSpec.getTopK());
+ parameters.addAll(vectorSearchSpec.getSearchColumns().values());
+
+
FunctionCallCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData,
Object>>
+ generatedFetcher =
+
VectorSearchCodeGenerator.generateAsyncVectorSearchFunction(
+ config,
+ jobClassLoader,
+ dataTypeFactory,
+ inputType,
+ searchOutputType,
+ outputType,
+ parameters,
+ vectorSearchFunction,
+ ((TableSourceTable) searchTable)
+ .contextResolvedTable()
+ .getIdentifier()
+ .asSummaryString());
+
+ boolean isLeftOuterJoin = vectorSearchSpec.getJoinType() ==
JoinRelType.LEFT;
+
+ Preconditions.checkNotNull(asyncOptions, "Async Options can not be
null.");
+
+ return new AsyncWaitOperatorFactory<>(
+ new AsyncVectorSearchRunner(
+ (GeneratedFunction) generatedFetcher.tableFunc(),
+ isLeftOuterJoin,
+ asyncOptions.asyncBufferCapacity,
+ searchOutputType.getFieldCount()),
+ asyncOptions.asyncTimeout,
+ asyncOptions.asyncBufferCapacity,
+ asyncOptions.asyncOutputMode);
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/VectorSearchCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/VectorSearchCodeGenerator.scala
index 1b533b366f3..87399303699 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/VectorSearchCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/VectorSearchCodeGenerator.scala
@@ -19,9 +19,11 @@ package org.apache.flink.table.planner.codegen
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.configuration.ReadableConfig
+import org.apache.flink.streaming.api.functions.async.AsyncFunction
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.data.RowData
import org.apache.flink.table.functions._
+import
org.apache.flink.table.planner.codegen.FunctionCallCodeGenerator.GeneratedTableFunctionWithDataType
import org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil
import org.apache.flink.table.planner.functions.inference.FunctionCallContext
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil.FunctionParam
@@ -68,6 +70,32 @@ object VectorSearchCodeGenerator {
.tableFunc
}
+ /** Generates a async vector search function ([[AsyncTableFunction]]) */
+ def generateAsyncVectorSearchFunction(
+ tableConfig: ReadableConfig,
+ classLoader: ClassLoader,
+ dataTypeFactory: DataTypeFactory,
+ inputType: LogicalType,
+ searchOutputType: LogicalType,
+ outputType: LogicalType,
+ searchColumns: util.List[FunctionParam],
+ asyncVectorSearchFunction: AsyncTableFunction[_],
+ functionName: String):
GeneratedTableFunctionWithDataType[AsyncFunction[RowData, AnyRef]] = {
+ FunctionCallCodeGenerator.generateAsyncFunctionCall(
+ tableConfig,
+ classLoader,
+ dataTypeFactory,
+ inputType,
+ searchOutputType,
+ outputType,
+ searchColumns,
+ asyncVectorSearchFunction,
+ generateCallWithDataType(functionName, searchOutputType),
+ functionName,
+ "AsyncVectorSearchFunction"
+ )
+ }
+
private def generateCallWithDataType(
functionName: String,
searchOutputType: LogicalType
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index 7f69919e3ef..df439bd2c9e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -54,6 +54,7 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.conversion.RowRowConverter;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.functions.VectorSearchFunction;
@@ -74,6 +75,8 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.RelativeClock;
import org.apache.flink.util.clock.SystemClock;
+import javax.annotation.Nullable;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -1171,4 +1174,51 @@ public final class TestValuesRuntimeFunctions {
return sum;
}
}
+
+ public static class TestValueAsyncVectorSearchFunction extends
AsyncVectorSearchFunction {
+
+ private final TestValueVectorSearchFunction impl;
+ private final @Nullable Integer latency;
+ private transient ExecutorService executors;
+ private transient Random random;
+
+ public TestValueAsyncVectorSearchFunction(
+ List<Row> data,
+ int[] searchIndices,
+ DataType physicalRowType,
+ @Nullable Integer latency) {
+ this.impl = new TestValueVectorSearchFunction(data, searchIndices,
physicalRowType);
+ this.latency = latency;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ impl.open(context);
+ executors = Executors.newCachedThreadPool();
+ random = new Random();
+ }
+
+ @Override
+ public CompletableFuture<Collection<RowData>> asyncVectorSearch(
+ int topK, RowData queryData) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Thread.sleep(latency == null ?
random.nextInt(1000) : latency);
+ return impl.vectorSearch(topK, queryData);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ },
+ executors);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ impl.close();
+ executors.shutdown();
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index d31ef6531ea..cebde11ac66 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -83,6 +83,7 @@ import
org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import
org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
import
org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
+import
org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
import
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -95,9 +96,11 @@ import
org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.VectorSearchFunction;
import org.apache.flink.table.legacy.api.TableSchema;
import org.apache.flink.table.legacy.api.WatermarkSpec;
import
org.apache.flink.table.legacy.connector.source.AsyncTableFunctionProvider;
@@ -501,6 +504,14 @@ public final class TestValuesTableFactory
"Option to specify the amount of time to sleep
after processing every N elements. "
+ "The default value is 0, which means
that no sleep is performed");
+ public static final ConfigOption<Integer> LATENCY =
+ ConfigOptions.key("latency")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Latency in milliseconds for async vector search
call for each row. "
+ + "If not set, the default is random
between 0ms and 1000ms.");
+
/**
* Parse partition list from Options with the format as
* "key1:val1,key2:val2;key1:val3,key2:val4".
@@ -654,7 +665,9 @@ public final class TestValuesTableFactory
readableMetadata,
null,
parallelism,
- enableAggregatePushDown);
+ enableAggregatePushDown,
+ isAsync,
+ helper.getOptions().get(LATENCY));
}
if (disableLookup) {
@@ -888,7 +901,8 @@ public final class TestValuesTableFactory
FULL_CACHE_PERIODIC_RELOAD_INTERVAL,
FULL_CACHE_PERIODIC_RELOAD_SCHEDULE_MODE,
FULL_CACHE_TIMED_RELOAD_ISO_TIME,
- FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS));
+ FULL_CACHE_TIMED_RELOAD_INTERVAL_IN_DAYS,
+ LATENCY));
}
private static int validateAndExtractRowtimeIndex(
@@ -1054,7 +1068,7 @@ public final class TestValuesTableFactory
private @Nullable int[] groupingSet;
private List<AggregateExpression> aggregateExpressions;
private List<String> acceptedPartitionFilterFields;
- private final Integer parallelism;
+ protected final Integer parallelism;
private TestValuesScanTableSourceWithoutProjectionPushDown(
DataType producedDataType,
@@ -2247,6 +2261,9 @@ public final class TestValuesTableFactory
extends TestValuesScanTableSourceWithoutProjectionPushDown
implements VectorSearchTableSource {
+ private final boolean isAsync;
+ @Nullable private final Integer latency;
+
private TestValuesVectorSearchTableSourceWithoutProjectionPushDown(
DataType producedDataType,
ChangelogMode changelogMode,
@@ -2266,7 +2283,9 @@ public final class TestValuesTableFactory
Map<String, DataType> readableMetadata,
@Nullable int[] projectedMetadataFields,
@Nullable Integer parallelism,
- boolean enableAggregatePushDown) {
+ boolean enableAggregatePushDown,
+ boolean isAsync,
+ @Nullable Integer latency) {
super(
producedDataType,
changelogMode,
@@ -2287,6 +2306,8 @@ public final class TestValuesTableFactory
projectedMetadataFields,
parallelism,
enableAggregatePushDown);
+ this.isAsync = isAsync;
+ this.latency = latency;
}
@Override
@@ -2295,9 +2316,67 @@ public final class TestValuesTableFactory
Arrays.stream(context.getSearchColumns()).mapToInt(k ->
k[0]).toArray();
Collection<Row> rows =
data.getOrDefault(Collections.emptyMap(),
Collections.emptyList());
- return VectorSearchFunctionProvider.of(
+ TestValuesRuntimeFunctions.TestValueVectorSearchFunction
searchFunction =
new
TestValuesRuntimeFunctions.TestValueVectorSearchFunction(
- new ArrayList<>(rows), searchColumns,
producedDataType));
+ new ArrayList<>(rows), searchColumns,
producedDataType);
+
+ if (isAsync) {
+ return new VectorFunctionProvider(
+ new
TestValuesRuntimeFunctions.TestValueAsyncVectorSearchFunction(
+ new ArrayList<>(rows), searchColumns,
producedDataType, latency),
+ searchFunction);
+ } else {
+ return VectorSearchFunctionProvider.of(searchFunction);
+ }
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return new
TestValuesVectorSearchTableSourceWithoutProjectionPushDown(
+ producedDataType,
+ changelogMode,
+ boundedness,
+ terminating,
+ runtimeSource,
+ failingSource,
+ data,
+ nestedProjectionSupported,
+ projectedPhysicalFields,
+ filterPredicates,
+ filterableFields,
+ dynamicFilteringFields,
+ numElementToSkip,
+ limit,
+ allPartitions,
+ readableMetadata,
+ projectedMetadataFields,
+ parallelism,
+ enableAggregatePushDown,
+ isAsync,
+ latency);
+ }
+
+ private static class VectorFunctionProvider
+ implements AsyncVectorSearchFunctionProvider,
VectorSearchFunctionProvider {
+
+ private final AsyncVectorSearchFunction asyncFunction;
+ private final VectorSearchFunction syncFunction;
+
+ public VectorFunctionProvider(
+ AsyncVectorSearchFunction asyncFunction,
VectorSearchFunction syncFunction) {
+ this.asyncFunction = asyncFunction;
+ this.syncFunction = syncFunction;
+ }
+
+ @Override
+ public AsyncVectorSearchFunction createAsyncVectorSearchFunction()
{
+ return asyncFunction;
+ }
+
+ @Override
+ public VectorSearchFunction createVectorSearchFunction() {
+ return syncFunction;
+ }
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
new file mode 100644
index 00000000000..1b67730805e
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncVectorSearchITCase.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThatList;
+
+/** ITCase for async VECTOR_SEARCH. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class AsyncVectorSearchITCase extends StreamingWithStateTestBase {
+
+ public AsyncVectorSearchITCase(StateBackendMode state) {
+ super(state);
+ }
+
+ private final List<Row> data =
+ Arrays.asList(
+ Row.of(1L, new Float[] {5f, 12f, 13f}),
+ Row.of(2L, new Float[] {11f, 60f, 61f}),
+ Row.of(3L, new Float[] {8f, 15f, 17f}));
+
+ private final List<Row> nullableData =
+ Arrays.asList(Row.of(1L, new Float[] {5f, 12f, 13f}), Row.of(4L,
null));
+
+ @BeforeEach
+ public void before() {
+ super.before();
+ createTable("src", data);
+ createTable("nullableSrc", nullableData);
+ createTable("vector", data);
+ }
+
+ @TestTemplate
+ void testSimple() {
+ List<Row> actual =
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql(
+ "SELECT * FROM src, LATERAL
TABLE(VECTOR_SEARCH(TABLE vector, DESCRIPTOR(`vector`), src.vector, 2))")
+ .collect());
+ assertThatList(actual)
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1L,
+ new Float[] {5.0f, 12.0f, 13.0f},
+ 1L,
+ new Float[] {5.0f, 12.0f, 13.0f},
+ 1.0),
+ Row.of(
+ 1L,
+ new Float[] {5.0f, 12.0f, 13.0f},
+ 3L,
+ new Float[] {8f, 15f, 17f},
+ 0.9977375565610862),
+ Row.of(
+ 2L,
+ new Float[] {11f, 60f, 61f},
+ 2L,
+ new Float[] {11f, 60f, 61f},
+ 1.0),
+ Row.of(
+ 2L,
+ new Float[] {11f, 60f, 61f},
+ 1L,
+ new Float[] {5.0f, 12.0f, 13.0f},
+ 0.9886506935687265),
+ Row.of(
+ 3L,
+ new Float[] {8f, 15f, 17f},
+ 3L,
+ new Float[] {8f, 15f, 17f},
+ 1.0000000000000002),
+ Row.of(
+ 3L,
+ new Float[] {8f, 15f, 17f},
+ 1L,
+ new Float[] {5.0f, 12.0f, 13.0f},
+ 0.9977375565610862));
+ }
+
+ @TestTemplate
+ void testLeftLateralJoin() {
+ List<Row> actual =
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql(
+ "SELECT * FROM nullableSrc LEFT JOIN
LATERAL TABLE(VECTOR_SEARCH(TABLE vector, DESCRIPTOR(`vector`),
nullableSrc.vector, 2)) ON TRUE")
+ .collect());
+ assertThatList(actual)
+ .containsExactlyInAnyOrder(
+ Row.of(
+ 1L,
+ new Float[] {5.0f, 12.0f, 13.0f},
+ 1L,
+ new Float[] {5.0f, 12.0f, 13.0f},
+ 1.0),
+ Row.of(
+ 1L,
+ new Float[] {5.0f, 12.0f, 13.0f},
+ 3L,
+ new Float[] {8f, 15f, 17f},
+ 0.9977375565610862),
+ Row.of(4L, null, null, null, null));
+ }
+
+ @TestTemplate
+ void testTimeout() {
+ tEnv().getConfig()
+ .set(
+
ExecutionConfigOptions.TABLE_EXEC_ASYNC_VECTOR_SEARCH_TIMEOUT,
+ Duration.ofMillis(100));
+ assertThatThrownBy(
+ () ->
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql(
+ "SELECT * FROM
nullableSrc LEFT JOIN LATERAL TABLE(VECTOR_SEARCH(TABLE vector,
DESCRIPTOR(`vector`), nullableSrc.vector, 2)) ON TRUE")
+ .collect()))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ TimeoutException.class, "Async function call
has timed out."));
+ }
+
+ @TestTemplate
+ void testVectorSearchWithCalc() {
+ assertThatThrownBy(
+ () ->
+ tEnv().executeSql(
+ "SELECT * FROM nullableSrc\n "
+ + "LEFT JOIN LATERAL
TABLE(VECTOR_SEARCH((SELECT id+1, vector FROM vector), DESCRIPTOR(`vector`),
nullableSrc.vector, 2)) ON TRUE"))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Don't support calc on VECTOR_SEARCH node
now."));
+ }
+
+ @Parameters(name = "backend = {0}, objectReuse = {1}, asyncOutputMode =
{2}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(
+ new Object[][] {
+ {
+ StreamingWithStateTestBase.HEAP_BACKEND(),
+ true,
+ ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED
+ },
+ {
+ StreamingWithStateTestBase.HEAP_BACKEND(),
+ true,
+ ExecutionConfigOptions.AsyncOutputMode.ORDERED
+ },
+ {
+ StreamingWithStateTestBase.HEAP_BACKEND(),
+ false,
+ ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED
+ },
+ {
+ StreamingWithStateTestBase.HEAP_BACKEND(),
+ false,
+ ExecutionConfigOptions.AsyncOutputMode.ORDERED
+ },
+ {
+ StreamingWithStateTestBase.ROCKSDB_BACKEND(),
+ true,
+ ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED
+ },
+ {
+ StreamingWithStateTestBase.ROCKSDB_BACKEND(),
+ true,
+ ExecutionConfigOptions.AsyncOutputMode.ORDERED
+ },
+ {
+ StreamingWithStateTestBase.ROCKSDB_BACKEND(),
+ false,
+ ExecutionConfigOptions.AsyncOutputMode.ALLOW_UNORDERED
+ },
+ {
+ StreamingWithStateTestBase.ROCKSDB_BACKEND(),
+ false,
+ ExecutionConfigOptions.AsyncOutputMode.ORDERED
+ }
+ });
+ }
+
+ private void createTable(String tableName, List<Row> data) {
+ String dataId = TestValuesTableFactory.registerData(data);
+ tEnv().executeSql(
+ String.format(
+ "CREATE TABLE `%s`(\n"
+ + " id BIGINT,\n"
+ + " vector ARRAY<FLOAT>\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'enable-vector-search' =
'true',\n"
+ + " 'data-id' = '%s',\n"
+ + " 'async' = 'true',\n"
+ + " 'latency' = '1000'"
+ + ")",
+ tableName, dataId));
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/search/AsyncVectorSearchRunner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/search/AsyncVectorSearchRunner.java
new file mode 100644
index 00000000000..b9ff849d708
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/search/AsyncVectorSearchRunner.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.search;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.operators.AbstractAsyncFunctionRunner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Async function runner for {@link AsyncVectorSearchFunction}, which takes
the generated function,
+ * instantiates it, and then calls its lifecycle methods.
+ */
+public class AsyncVectorSearchRunner extends
AbstractAsyncFunctionRunner<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isLeftOuterJoin;
+ private final int asyncBufferCapacity;
+ private final int searchTableFieldCount;
+
+ /**
+ * Buffers {@link ResultFuture} to avoid newInstance cost when processing
elements every time.
+ * We use {@link BlockingQueue} to make sure the head {@link
ResultFuture}s are available.
+ */
+ private transient BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
+
+ public AsyncVectorSearchRunner(
+ GeneratedFunction<AsyncFunction<RowData, RowData>>
generatedFetcher,
+ boolean isLeftOuterJoin,
+ int asyncBufferCapacity,
+ int searchTableFieldCount) {
+ super(generatedFetcher);
+ this.isLeftOuterJoin = isLeftOuterJoin;
+ this.asyncBufferCapacity = asyncBufferCapacity;
+ this.searchTableFieldCount = searchTableFieldCount;
+ }
+
+ @Override
+ public void open(OpenContext openContext) throws Exception {
+ super.open(openContext);
+ this.resultFutureBuffer = new ArrayBlockingQueue<>(asyncBufferCapacity
+ 1);
+ // asyncBufferCapacity + 1 as the queue size in order to avoid
+ // blocking on the queue when taking a collector.
+ for (int i = 0; i < asyncBufferCapacity + 1; i++) {
+ resultFutureBuffer.add(
+ new JoinedRowResultFuture(
+ resultFutureBuffer, isLeftOuterJoin,
searchTableFieldCount));
+ }
+ }
+
+ @Override
+ public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture)
throws Exception {
+ JoinedRowResultFuture wrapper = resultFutureBuffer.take();
+ wrapper.reset(input, resultFuture);
+ fetcher.asyncInvoke(input, wrapper);
+ }
+
+ private static final class JoinedRowResultFuture implements
ResultFuture<RowData> {
+
+ private final BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
+ private final boolean isLeftOuterJoin;
+ private final GenericRowData nullRow;
+
+ private RowData leftRow;
+ private ResultFuture<RowData> realOutput;
+
+ private JoinedRowResultFuture(
+ BlockingQueue<JoinedRowResultFuture> resultFutureBuffer,
+ boolean isLeftOuterJoin,
+ int searchTableArity) {
+ this.resultFutureBuffer = resultFutureBuffer;
+ this.isLeftOuterJoin = isLeftOuterJoin;
+ this.nullRow = new GenericRowData(searchTableArity);
+ }
+
+ public void reset(RowData leftRow, ResultFuture<RowData> realOutput) {
+ this.leftRow = leftRow;
+ this.realOutput = realOutput;
+ }
+
+ @Override
+ public void complete(Collection<RowData> result) {
+ if (result == null || result.isEmpty()) {
+ if (isLeftOuterJoin) {
+ RowData outRow = new JoinedRowData(leftRow.getRowKind(),
leftRow, nullRow);
+ realOutput.complete(Collections.singleton(outRow));
+ } else {
+ realOutput.complete(Collections.emptyList());
+ }
+ } else {
+ List<RowData> outRows = new ArrayList<>();
+ for (RowData right : result) {
+ RowData outRow = new JoinedRowData(leftRow.getRowKind(),
leftRow, right);
+ outRows.add(outRow);
+ }
+ realOutput.complete(outRows);
+ }
+ try {
+ // put this collector to the queue to avoid this collector is
used
+ // again before outRows in the collector is not consumed.
+ resultFutureBuffer.put(this);
+ } catch (InterruptedException e) {
+ completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void completeExceptionally(Throwable error) {
+ realOutput.completeExceptionally(error);
+ }
+
+ @Override
+ public void complete(CollectionSupplier<RowData> supplier) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}