This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 58035ea658e [FLINK-38423][table-api] Add VECTOR_SEARCH connector API 
(#27037)
58035ea658e is described below

commit 58035ea658e8bb3c0db5128280f958f1e572ad4a
Author: Shengkai <[email protected]>
AuthorDate: Tue Oct 14 14:34:09 2025 +0800

    [FLINK-38423][table-api] Add VECTOR_SEARCH connector API (#27037)
---
 .../connector/source/VectorSearchTableSource.java  | 118 +++++++++++++++++++++
 .../search/AsyncVectorSearchFunctionProvider.java  |  38 +++++++
 .../search/VectorSearchFunctionProvider.java       |  37 +++++++
 .../table/functions/AsyncVectorSearchFunction.java |  69 ++++++++++++
 .../table/functions/VectorSearchFunction.java      |  65 ++++++++++++
 5 files changed, 327 insertions(+)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java
new file mode 100644
index 00000000000..0e7e6279bb1
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider;
+import 
org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider;
+import org.apache.flink.types.RowKind;
+
+import java.io.Serializable;
+
+/**
+ * A {@link DynamicTableSource} that searches rows of an external storage 
system by one or more
+ * vectors during runtime.
+ *
+ * <p>Compared to {@link ScanTableSource}, the source does not have to read 
the entire table and can
+ * lazily fetch individual values from a (possibly continuously changing) 
external table when
+ * necessary.
+ *
+ * <p>Note: Compared to {@link ScanTableSource}, a {@link 
VectorSearchTableSource} only supports
+ * emitting insert-only changes (see also {@link RowKind}).
+ *
+ * <p>In the last step, the planner will call {@link 
#getSearchRuntimeProvider(VectorSearchContext)}
+ * to obtain a provider of runtime implementation. The search fields that are 
required to perform a
+ * search are derived from a query by the planner and will be provided in the 
given {@link
+ * VectorSearchTableSource.VectorSearchContext#getSearchColumns()}. The values 
for those search
+ * fields are passed at runtime.
+ */
+@PublicEvolving
+public interface VectorSearchTableSource extends DynamicTableSource {
+
+    /**
+     * Returns a {@code VectorSearchRuntimeProvider}. 
VectorSearchRuntimeProvider is a base
+     * interface that should be extended (is this true) by child interfaces 
for specialized vector
+     * searches.
+     *
+     * <p>There exist different interfaces for runtime implementation which is 
why {@link
+     * VectorSearchRuntimeProvider} serves as the base interface.
+     *
+     * <p>Independent of the provider interface, a source implementation can 
work on either
+     * arbitrary objects or internal data structures (see {@link 
org.apache.flink.table.data} for
+     * more information).
+     *
+     * <p>The given {@link VectorSearchContext} offers utilities for the 
planner to create runtime
+     * implementation with minimal dependencies to internal data structures.
+     *
+     * @see VectorSearchFunctionProvider
+     * @see AsyncVectorSearchFunctionProvider
+     */
+    VectorSearchRuntimeProvider getSearchRuntimeProvider(VectorSearchContext 
context);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helper interfaces
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Context for creating runtime implementation via a {@link 
VectorSearchRuntimeProvider}.
+     *
+     * <p>It offers utilities for the planner to create runtime implementation 
with minimal
+     * dependencies to internal data structures.
+     *
+     * <p>Methods should be called in {@link 
#getSearchRuntimeProvider(VectorSearchContext)}.
+     * Returned instances that are {@link Serializable} can be directly passed 
into the runtime
+     * implementation class.
+     */
+    @PublicEvolving
+    interface VectorSearchContext extends DynamicTableSource.Context {
+
+        /**
+         * Returns an array of key index paths that should be used during the 
search. The indices
+         * are 0-based and support composite keys within (possibly nested) 
structures.
+         *
+         * <p>For example, given a table with data type {@code ROW < i INT, s 
STRING, r ROW < i2
+         * INT, s2 STRING > >}, this method would return {@code [[0], [2, 1]]} 
when {@code i} and
+         * {@code s2} are used for performing a lookup.
+         *
+         * @return array of key index paths
+         */
+        int[][] getSearchColumns();
+
+        /**
+         * Runtime config provided to the provider. The config can be used by 
the planner or vector
+         * search provider at runtime. For example, async options can be used 
by planner to choose
+         * async inference. Other config such as http timeout or retry can be 
used to configure
+         * search functions.
+         */
+        ReadableConfig runtimeConfig();
+    }
+
+    /**
+     * Provides actual runtime implementation for reading the data.
+     *
+     * <p>There exists different interfaces for runtime implementation which 
is why {@link
+     * VectorSearchRuntimeProvider} serves as the base interface.
+     *
+     * @see VectorSearchFunctionProvider
+     * @see AsyncVectorSearchFunctionProvider
+     */
+    @PublicEvolving
+    interface VectorSearchRuntimeProvider {}
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java
new file mode 100644
index 00000000000..9dd7a5083dc
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.search;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.functions.AsyncVectorSearchFunction;
+
+/** A provider for creating {@link AsyncVectorSearchFunction}. */
+@PublicEvolving
+public interface AsyncVectorSearchFunctionProvider
+        extends VectorSearchTableSource.VectorSearchRuntimeProvider {
+
+    /** Helper function for creating a static provider. */
+    static AsyncVectorSearchFunctionProvider of(
+            AsyncVectorSearchFunction asyncVectorSearchFunction) {
+        return () -> asyncVectorSearchFunction;
+    }
+
+    /** Creates an {@link AsyncVectorSearchFunction} instance. */
+    AsyncVectorSearchFunction createAsyncVectorSearchFunction();
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java
new file mode 100644
index 00000000000..fe50ad585df
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.search;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.VectorSearchTableSource;
+import org.apache.flink.table.functions.VectorSearchFunction;
+
+/** A provider for creating {@link VectorSearchFunction}. */
+@PublicEvolving
+public interface VectorSearchFunctionProvider
+        extends VectorSearchTableSource.VectorSearchRuntimeProvider {
+
+    /** Helper function for creating a static provider. */
+    static VectorSearchFunctionProvider of(VectorSearchFunction 
searchFunction) {
+        return () -> searchFunction;
+    }
+
+    /** Creates an {@link VectorSearchFunction} instance. */
+    VectorSearchFunction createVectorSearchFunction();
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java
new file mode 100644
index 00000000000..fdad0a50ecc
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class of {@link AsyncTableFunction} for asynchronous vector 
search.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class AsyncVectorSearchFunction extends 
AsyncTableFunction<RowData> {
+
+    /**
+     * Asynchronously search result based on input row to find topK matched 
rows.
+     *
+     * @param topK - The number of topK matched rows to return.
+     * @param queryData - A {@link RowData} that wraps input for search 
function.
+     * @return A collection of all searched results.
+     */
+    public abstract CompletableFuture<Collection<RowData>> asyncVectorSearch(
+            int topK, RowData queryData);
+
+    /** Invokes {@link #asyncVectorSearch} and chains futures. */
+    public void eval(CompletableFuture<Collection<RowData>> future, Object... 
args) {
+        int topK = (int) args[0];
+        GenericRowData argsData = new GenericRowData(args.length - 1);
+        for (int i = 1; i < args.length; ++i) {
+            argsData.setField(i, args[i]);
+        }
+        asyncVectorSearch(topK, argsData)
+                .whenComplete(
+                        (result, exception) -> {
+                            if (exception != null) {
+                                future.completeExceptionally(
+                                        new TableException(
+                                                String.format(
+                                                        "Failed to execute 
asynchronously search with input row %s.",
+                                                        argsData),
+                                                exception));
+                                return;
+                            }
+                            future.complete(result);
+                        });
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java
new file mode 100644
index 00000000000..8f71e3f2776
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A wrapper class of {@link TableFunction} for synchronous vector search.
+ *
+ * <p>The output type of this table function is fixed as {@link RowData}.
+ */
+@PublicEvolving
+public abstract class VectorSearchFunction extends TableFunction<RowData> {
+
+    /**
+     * Synchronously search result based on input row to find topK matched 
rows.
+     *
+     * @param topK - The number of topK results to return.
+     * @param queryData - A {@link RowData} that wraps input for vector search 
function.
+     * @return A collection of predicted results.
+     */
+    public abstract Collection<RowData> vectorSearch(int topK, RowData 
queryData)
+            throws IOException;
+
+    /** Invoke {@link #vectorSearch} and handle exceptions. */
+    public final void eval(Object... args) {
+        int topK = (int) args[0];
+        GenericRowData argsData = new GenericRowData(args.length - 1);
+        for (int i = 1; i < args.length; ++i) {
+            argsData.setField(i, args[i]);
+        }
+        try {
+            Collection<RowData> results = vectorSearch(topK, argsData);
+            if (results == null) {
+                return;
+            }
+            results.forEach(this::collect);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(
+                    String.format("Failed to execute search with input row 
%s.", argsData), e);
+        }
+    }
+}

Reply via email to