This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 0562bbe4 [doc] Update document for vector store. (#470)
0562bbe4 is described below
commit 0562bbe4c1d22282a787094dd22353ce59080307
Author: Wenjin Xie <[email protected]>
AuthorDate: Mon Jan 26 15:13:42 2026 +0800
[doc] Update document for vector store. (#470)
---
docs/content/docs/development/vector_stores.md | 510 ++++++++++++++++++++++---
1 file changed, 463 insertions(+), 47 deletions(-)
diff --git a/docs/content/docs/development/vector_stores.md
b/docs/content/docs/development/vector_stores.md
index 1ef71d7c..a51a925c 100644
--- a/docs/content/docs/development/vector_stores.md
+++ b/docs/content/docs/development/vector_stores.md
@@ -32,41 +32,60 @@ This page covers semantic search using vector stores.
Additional query modes (ke
Vector stores enable efficient storage, indexing, and retrieval of
high-dimensional embedding vectors alongside their associated documents. They
provide the foundation for semantic search capabilities in AI applications by
allowing fast similarity searches across large document collections.
+### Use Case
In Flink Agents, vector stores are essential for:
- **Document Retrieval**: Finding relevant documents based on semantic
similarity
- **Knowledge Base Search**: Querying large collections of information using
natural language
- **Retrieval-Augmented Generation (RAG)**: Providing context to language
models from vector-indexed knowledge
- **Semantic Similarity**: Comparing and ranking documents by meaning rather
than keywords
-## Getting Started
+### Concepts
+* **Document**: Document is the abstraction that represents a piece of text
and associated metadata.
+* **Collection**: Collection is the abstraction that represents a set of
documents. It corresponds to different concept for different vector store
specification, like index in Elasticsearch and collection in Chroma.
+
+## How to use
To use vector stores in your agents, you need to configure both a vector store
and an embedding model, then perform semantic search using structured queries.
-### Resource Decorators
+### Declare a vector store in Agent
-Flink Agents provides decorators to simplify vector store setup within agents:
+Flink Agents provides decorators/annotations to simplify vector store setup
within agents:
{{< tabs "Resource Decorators" >}}
{{< tab "Python" >}}
-
-#### @vector_store
-
-The `@vector_store` decorator marks a method that creates a vector store.
Vector stores automatically integrate with embedding models for text-based
search.
-
+```python
+@vector_store
+@staticmethod
+def my_vector_store() -> ResourceDescriptor:
+ return ResourceDescriptor(
+ clazz=Constant.CHROMA_VECTOR_STORE,
+ embedding_model="embedding_model",
+ collection="my_chroma_store"
+ )
+```
{{< /tab >}}
{{< tab "Java" >}}
-
-#### @VectorStore
-
-The `@VectorStore` annotation marks a method that creates a vector store.
-
+```java
+@VectorStore
+public static ResourceDescriptor vectorStore() {
+ return
ResourceDescriptor.Builder.newBuilder(Constant.ELASTICSEARCH_VECTOR_STORE)
+ .addInitialArgument("embedding_model", "embeddingModel")
+ .addInitialArgument("host", "http://localhost:9200")
+ .addInitialArgument("index", "my_documents")
+ .addInitialArgument("vector_field", "content_vector")
+ .addInitialArgument("dims", 1536)
+ .build();
+}
+```
{{< /tab >}}
{{< /tabs >}}
-### Query Objects
+### How to query the vector store
+
+#### Query Objects
Vector stores use structured query objects for consistent interfaces:
@@ -98,7 +117,7 @@ VectorStoreQuery query = new VectorStoreQuery(
{{< /tabs >}}
-### Query Results
+#### Query Results
When you execute a query, you receive a `VectorStoreQueryResult` object that
contains the search results:
@@ -131,6 +150,140 @@ VectorStoreQueryResult result = vectorStore.query(query);
{{< /tabs >}}
+### Manage collections
+
+User can dynamic create, get or delete collections in agent execution:
+* `get_or_create_collection`: Get a collection by name, create if not exists.
User can provide additional metadatas.
+* `get_collection`: Get a collection by name. The collection must be created
by flink-agents before.
+* `delete_collection`: Delete a collection by name.
+
+{{< hint info >}}
+Collection level operations is only supported for vector store that implements
`CollectionManageableVectorStore`. Currently, Chroma and Elasticsearch.
+{{< /hint >}}
+
+{{< tabs "Collection level operations" >}}
+
+{{< tab "Python" >}}
+
+```python
+# get the vector store from runner context
+vector_store: CollectionManageableVectorStore =
ctx.get_resource("vector_store", ResourceType.VECTOR_STORE)
+
+# create a collection
+collection: Collection = vector_store.get_or_create_collection("my_collection"
, metadata={"key1": "value1", "key2": "value2"})
+# get the collection
+collection: Collection = vector_store.get_collection("my_collection")
+# get the collection metadata
+metadata = collection.metadata
+
+# delete the collection
+vector_store.delete_collection("my_collection)
+```
+
+{{< /tab >}}
+
+{{< tab "Java" >}}
+
+```java
+// get the vector store from runner context
+CollectionManageableVectorStore vectorStore =
+ (CollectionManageableVectorStore)
+ ctx.getResource("vector_store", ResourceType.VECTOR_STORE);
+
+// create a collection
+Collection collection = vectorStore.getOrCreateCollection(
+ "my_collection", Map.of("key1", "value1", "key2", "value2"));
+// get the collection
+collection = vectorStore.getCollection("my_collection");
+// get the collection metadata
+Map<String, Object> metadata = collection.getMetadata();
+
+// delete the collection
+vectorStore.deleteCollection("my_collection");
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+
+### Manage documents
+User can dynamic add, get or delete documents in agent execution:
+* `add`: Add documents to a collection. If document ID is not specified, will
generate random ID for each document.
+* `get`: Get documents from a collection by IDs. If no IDs are provided, get
all documents.
+* `delete`: Delete documents from a collection by IDs. If no IDs are provided,
delete all documents.
+
+{{< hint info >}}
+If collection name is not specified, the document level operations will apply
to the default collection configured by vector store initialization parameters.
+{{< /hint >}}
+
+{{< tabs "Document level operations" >}}
+
+{{< tab "Python" >}}
+
+```python
+# get the vector store from runner context
+store: CollectionManageableVectorStore = ctx.get_resource("vector_store",
ResourceType.VECTOR_STORE)
+
+# create or get a collection
+collection: Collection = vector_store.get_or_create_collection("my_collection"
, metadata={"key1": "value1", "key2": "value2"})
+
+# add documents to the collection
+documents = [Document(id="doc1", content="the first doc", metadata={"key":
"value1"}),
+ Document(id="doc2", content="the second doc", metadata={"key":
"value2"})]
+vector_store.add(documents=documents, collection_name="my_collection")
+
+# get documents by IDs
+doc: List[Document] = vector_store.get(ids="doc2",
collectioin_name="my_collection")
+# get all documents
+doc: List[Document] = vector_store.get(collectioin_name="my_collection")
+
+# delete documents by IDs
+vector_store.delete(ids=["doc1", "doc2"], collection_name="my_collection")
+# delete all documents
+vector_store.delete(collection_name="my_collection")
+```
+
+{{< /tab >}}
+
+{{< tab "Java" >}}
+
+```java
+// get the vector store from runner context
+BaseVectorStore vectorStore =
+ (BaseVectorStore)
+ ctx.getResource("vectorStore",
ResourceType.VECTOR_STORE);
+// create or get a collection
+Collection collection = ((CollectionManageableVectorStore) vectorStore)
+ .getOrCreateCollection("my_collection", Map.of("key1", "value1",
"key2", "value2"));
+
+// add documents to the collection
+List<Document> documents = List.of(
+ new Document(
+ "the first doc.",
+ Map.of("key", "value1"),
+ "doc1"),
+ new Document(
+ "the second doc",
+ Map.of("key", "value2"),
+ "doc2"));
+vectorStore.add(documents, "my_collection", Collections.emptyMap());
+
+// get documents by IDs
+List<Document> docs = vectorStore.get(List.of("doc1"), "my_collection",
Collections.emptyMap());
+// get all documents
+docs = vectorStore.get(null, "my_collection", Collections.emptyMap());
+
+// delete documents by IDs
+vectorStore.delete(List.of("doc1", "doc2"), "my_collection",
Collections.emptyMap());
+// delete all documents
+vectorStore.delete(null, "my_collection", Collections.emptyMap());
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
### Usage Example
Here's how to define and use vector stores in your agent:
@@ -396,27 +549,31 @@ Elasticsearch is currently supported in the Java API
only. To use Elasticsearch
#### Prerequisites
1. An Elasticsearch cluster (version 8.0 or later for KNN support).
-2. An index with a `dense_vector` field.
#### ElasticsearchVectorStore Parameters
-| Parameter | Type | Default | Description |
-|-----------|------|---------|-------------|
-| `embedding_model` | str | Required | Reference to embedding model resource
name |
-| `index` | str | Required | Target Elasticsearch index name |
-| `vector_field` | str | Required | Name of the dense vector field used for
KNN |
-| `dims` | int | `768` | Vector dimensionality |
-| `k` | int | None | Number of nearest neighbors to return; can be overridden
per query |
-| `num_candidates` | int | None | Candidate set size for ANN search; can be
overridden per query |
-| `filter_query` | str | None | Raw JSON Elasticsearch filter query (DSL)
applied as a post-filter |
-| `host` | str | `"http://localhost:9200"` | Elasticsearch endpoint |
-| `hosts` | str | None | Comma-separated list of Elasticsearch endpoints |
-| `username` | str | None | Username for basic authentication |
-| `password` | str | None | Password for basic authentication |
-| `api_key_base64` | str | None | Base64-encoded API key for authentication |
-| `api_key_id` | str | None | API key ID for authentication |
-| `api_key_secret` | str | None | API key secret for authentication |
+| Parameter | Type | Default | Description
|
+|-------------------|------|---------------------------|--------------------------------------------------------------------|
+| `embedding_model` | str | Required | Reference to
embedding model resource name |
+| `index` | str | None | Default target
Elasticsearch index name |
+| `vector_field` | str | `"_vector"` | Name of the dense
vector field used for KNN |
+| `dims` | int | `768` | Vector dimensionality
|
+| `k` | int | None | Number of nearest
neighbors to return; can be overridden per query |
+| `num_candidates` | int | None | Candidate set size
for ANN search; can be overridden per query |
+| `filter_query` | str | None | Raw JSON
Elasticsearch filter query (DSL) applied as a post-filter |
+| `host` | str | `"http://localhost:9200"` | Elasticsearch
endpoint |
+| `hosts` | str | None | Comma-separated list
of Elasticsearch endpoints |
+| `username` | str | None | Username for basic
authentication |
+| `password` | str | None | Password for basic
authentication |
+| `api_key_base64` | str | None | Base64-encoded API
key for authentication |
+| `api_key_id` | str | None | API key ID for
authentication |
+| `api_key_secret` | str | None | API key secret for
authentication |
+
+{{< hint warning >}}
+For index not create by flink-agents, the index must have a `dense_tensor`
field, and user must specify the filed name by `vector_field`.
+And, the index can't be accessed by collection level operations due to
Elasticsearch does not support store index metadata natively.
+{{< /hint >}}
#### Usage Example
{{< tabs "Elasticsearch Usage Example" >}}
@@ -579,11 +736,11 @@ public class MyAgent extends Agent {
The custom provider APIs are experimental and unstable, subject to
incompatible changes in future releases.
{{< /hint >}}
-If you want to use vector stores not offered by the built-in providers, you
can extend the base vector store class and implement your own! The vector store
system is built around the `BaseVectorStore` abstract class.
+If you want to use vector stores not offered by the built-in providers, you
can extend the base vector store class and implement your own! The vector store
system is built around the `BaseVectorStore` abstract class and
`CollectionManageableVectorStore` interface.
### BaseVectorStore
-The base class handles text-to-vector conversion and provides the high-level
query interface. You only need to implement the core vector search
functionality.
+The base class handles text-to-vector conversion and provides the high-level
add and query interface. You only need to implement the core search
functionality and other basic document level operations.
{{< tabs "Custom Vector Store" >}}
@@ -598,14 +755,95 @@ class MyVectorStore(BaseVectorStore):
# Return vector store-specific configuration
# These parameters are merged with query-specific parameters
return {"index": "my_index", ...}
+
+ @override
+ def size(self, collection_name: str | None = None) -> int:
+ """Get the size of the collection in vector store.
+
+ Args:
+ collection_name: The target collection. If not provided, use
defualt collection.
+ """
+ size = ...
+ return size
+
+ @override
+ def get(
+ self,
+ ids: str | List[str] | None = None,
+ collection_name: str | None = None,
+ **kwargs: Any,
+ ) -> List[Document]:
+ """Retrieve documents from the vector store by its ID.
+
+ Args:
+ ids: Unique identifier of the documents to retrieve. If not
provided, get all documents.
+ collection_name: The collection name of the documents to retrieve.
+ If not provided, use defualt collection.
+ **kwargs: Vector store specific parameters (offset, limit, filter
etc.)
+
+ Returns:
+ Document object if found, None otherwise
+ """
+ documents: List[Document] = ...
+ return documents
+
+ @override
+ def delete(
+ self,
+ ids: str | List[str] | None = None,
+ collection_name: str | None = None,
+ **kwargs: Any,
+ ) -> None:
+ """Delete documents in the vector store by its IDs.
+
+ Args:
+ ids: Unique identifier of the documents to delete. If not
provided, delete all documents.
+ collection_name: The collection name of the documents belong to.
+ If not provided, use defualt collection.
+ **kwargs: Vector store specific parameters (filter etc.)
+ """
+ # delete the documents
+ pass
+ @override
def query_embedding(self, embedding: List[float], limit: int = 10,
**kwargs: Any) -> List[Document]:
- # Core method: perform vector search using pre-computed embedding
- # - embedding: Pre-computed embedding vector for semantic search
- # - limit: Maximum number of results to return
- # - kwargs: Vector store-specific parameters
- # - Returns: List of Document objects matching the search criteria
- pass
+ """Perform vector search using pre-computed embedding.
+
+ Args:
+ embedding: Pre-computed embedding vector for semantic search
+ limit: Maximum number of results to return (default: 10)
+ collection_name: The collection to apply the query.
+ If not provided, use default collection.
+ **kwargs: Vector store-specific parameters (filters, distance
metrics, etc.)
+
+ Returns:
+ List of documents matching the search criteria
+ """
+ documents: List[Document] = ...
+ return documents
+
+ @override
+ def _add_embedding(
+ self,
+ *,
+ documents: List[Document],
+ collection_name: str | None = None,
+ **kwargs: Any,
+ ) -> List[str]:
+ """Add documents with pre-computed embeddings to the vector store.
+
+ Args:
+ documents: Documents with embeddings to add to the vector store
+ collection_name: The collection name of the documents to add.
+ If not provided, use default collection.
+ **kwargs: Vector store-specific parameters (collection, namespace,
etc.)
+
+ Returns:
+ List of document IDs that were added to the vector store
+ """
+ # add the documents
+ ids: List[str] = ...
+ return ids
```
{{< /tab >}}
@@ -629,15 +867,193 @@ public class MyVectorStore extends BaseVectorStore {
kwargs.put("index", "my_index");
return kwargs;
}
+
+ /**
+ * Get the size of the collection in vector store.
+ *
+ * @param collection The name of the collection to count. If is null,
count the default
+ * collection.
+ * @return The documents count in the collection.
+ */
+ @Override
+ public long size(@Nullable String collection) throws Exception {
+ size = ...;
+ return size;
+ }
+ /**
+ * Retrieve documents from the vector store.
+ *
+ * @param ids The ids of the documents. If is null, get all the documents
or first n documents
+ * according to implementation specific limit.
+ * @param collection The name of the collection to be retrieved. If is
null, retrieve the
+ * default collection.
+ * @param extraArgs Additional arguments.
+ * @return List of documents retrieved.
+ */
@Override
- public List<Document> queryEmbedding(float[] embedding, int limit,
Map<String, Object> args) {
- // Core method: perform vector search using pre-computed embedding
- // - embedding: Pre-computed embedding vector for semantic search
- // - limit: Maximum number of results to return
- // - args: Vector store-specific parameters
- // - Returns: List of Document objects matching the search criteria
- return null;
+ public List<Document> get(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException {
+ List<Document> documents = ...;
+ return documents;
+ }
+
+ /**
+ * Delete documents in the vector store.
+ *
+ * @param ids The ids of the documents. If is null, delete all the
documents.
+ * @param collection The name of the collection the documents belong to.
If is null, use the
+ * default collection.
+ * @param extraArgs Additional arguments.
+ */
+ @Override
+ public void delete(
+ @Nullable List<String> ids, @Nullable String collection,
Map<String, Object> extraArgs)
+ throws IOException {
+ // delete the documents
+ }
+
+ /**
+ * Performs vector search using a pre-computed embedding.
+ *
+ * @param embedding The embedding vector to search with
+ * @param limit Maximum number of results to return
+ * @param collection The collection to query to. If is null, query the
default collection.
+ * @param args Additional arguments for the vector search
+ * @return List of documents matching the query embedding
+ */
+ @Override
+ protected List<Document> queryEmbedding(
+ float[] embedding, int limit, @Nullable String collection,
Map<String, Object> args) {
+ List<Document> documents = ...;
+ return documents;
+ }
+
+ /**
+ * Add documents with pre-computed embedding to vector store.
+ *
+ * @param documents The documents to be added.
+ * @param collection The name of the collection to add to. If is null, add
to the default
+ * collection.
+ * @param extraArgs Additional arguments.
+ * @return IDs of the added documents.
+ */
+ @Override
+ protected List<String> addEmbedding(
+ List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
+ throws IOException {
+ // add the documents
+ List<String> ids = ...;
+ return ids;
+ }
+}
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+### CollectionManageableVectorStore
+
+For vector store which support collection level operations, user can implement
follow methods additionally.
+
+{{< tabs "Custom Vector Store support Collection" >}}
+
+{{< tab "Python" >}}
+
+```python
+class MyVectorStore(CollectionManageableVectorStore):
+ # Add your custom configuration fields here
+
+ # implementation for `BaseVectoStore` method.
+
+ @override
+ def get_or_create_collection(
+ self, name: str, metadata: Dict[str, Any] | None = None
+ ) -> Collection:
+ """Get a collection, or create it if it doesn't exist.
+
+ Args:
+ name: Name of the collection
+ metadata: Metadata of the collection
+ Returns:
+ The retrieved or created collection
+ """
+ collection: Collection = ...
+ return collection
+
+ @override
+ def get_collection(self, name: str) -> Collection:
+ """Get a collection, raise an exception if it doesn't exist.
+
+ Args:
+ name: Name of the collection
+ Returns:
+ The retrieved collection
+ """
+ collection: Collection = ...
+ return collection
+
+ @override
+ def delete_collection(self, name: str) -> Collection:
+ """Delete a collection.
+
+ Args:
+ name: Name of the collection
+ Returns:
+ The deleted collection
+ """
+ collection: Collection = ...
+ return collection
+```
+
+{{< /tab >}}
+
+{{< tab "Java" >}}
+
+```java
+public class MyVectorStore extends BaseVectorStore
+ implements CollectionManageableVectorStore{
+ // Add your custom configuration fields here
+
+ // implementation for `BaseVectoStore` method.
+
+ /**
+ * Get a collection, or create it if it doesn't exist.
+ *
+ * @param name The name of the collection to get or create.
+ * @param metadata The metadata of the collection.
+ * @return The retrieved or created collection.
+ */
+ @override
+ public Collection getOrCreateCollection(String name, Map<String, Object>
metadata) throws Exception {
+ Collection collection = ...;
+ return collection;
+ }
+
+ /**
+ * Get a collection by name.
+ *
+ * @param name The name of the collection to get.
+ * @return The retrieved collection.
+ */
+ @override
+ public Collection getCollection(String name) throws Exception {
+ Collection collection = ...;
+ return collection;
+ }
+
+ /**
+ * Delete a collection by name.
+ *
+ * @param name The name of the collection to delete.
+ * @return The deleted collection.
+ */
+ @override
+ public Collection deleteCollection(String name) throws Exception {
+ Collection collection = ...;
+ return collection;
}
}
```