wenjin272 commented on code in PR #663: URL: https://github.com/apache/flink-agents/pull/663#discussion_r3233575344
########## docs/content/docs/development/vector_stores.md: ########## @@ -41,6 +41,7 @@ In Flink Agents, vector stores are essential for: ### Concepts * **Document**: Document is the abstraction that represents a piece of text and associated metadata. A document may also carry a pre-computed `embedding` vector and a `score` populated by query results. +* **Collection**: Collection is the abstraction that represents a set of documents. It corresponds to different concept for different vector store specification, like index in Elasticsearch/OpenSearch and collection in Chroma/Milvus. Review Comment: It looks like there were some conflicts during the rebase. In a previous PR, we removed the `Collection` data structure and this line of description. ########## integrations/vector-stores/milvus/src/main/java/org/apache/flink/agents/integrations/vectorstores/milvus/MilvusVectorStore.java: ########## @@ -0,0 +1,1185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.integrations.vectorstores.milvus; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.common.ConsistencyLevel; +import io.milvus.v2.common.DataType; +import io.milvus.v2.common.IndexParam; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.DropCollectionReq; +import io.milvus.v2.service.collection.request.GetLoadStateReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.collection.request.LoadCollectionReq; +import io.milvus.v2.service.vector.request.DeleteReq; +import io.milvus.v2.service.vector.request.InsertReq; +import io.milvus.v2.service.vector.request.QueryReq; +import io.milvus.v2.service.vector.request.SearchReq; +import io.milvus.v2.service.vector.request.UpsertReq; +import io.milvus.v2.service.vector.request.data.BaseVector; +import io.milvus.v2.service.vector.request.data.FloatVec; +import io.milvus.v2.service.vector.response.QueryResp; +import io.milvus.v2.service.vector.response.SearchResp; +import org.apache.flink.agents.api.resource.ResourceContext; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.vectorstores.BaseVectorStore; +import org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +/** + * Milvus-backed implementation of a vector store. + * + * <p>This implementation executes dense-vector similarity search against a Milvus collection. It + * integrates with an embedding model (configured via the {@code embedding_model} resource argument + * inherited from {@link BaseVectorStore}) to convert query text into embeddings and then performs + * vector search using Milvus' search API. + * + * <p>The store creates collections with a simple dense-vector schema: + * + * <ul> + * <li>{@code id}: VarChar primary key + * <li>{@code content}: VarChar document content + * <li>{@code metadata}: JSON metadata map + * <li>{@code embedding}: FloatVector + * </ul> + * + * <p>Configuration is provided through {@link ResourceDescriptor} arguments. The most relevant ones + * are: + * + * <ul> + * <li>{@code collection} or {@code index} (optional): Target collection name. If omitted, + * defaults to {@link #DEFAULT_COLLECTION}. + * <li>{@code dims} (optional): Vector dimensionality; defaults to {@link #DEFAULT_DIMENSION}. + * <li>{@code vector_field}, {@code content_field}, {@code metadata_field}, {@code id_field} + * (optional): Schema field names. + * <li>{@code metric_type} (optional): Milvus metric type; defaults to {@code COSINE}. + * <li>{@code index_type} and {@code index_params} (optional): Milvus vector index settings. + * <li>{@code metadata_index_keys} (optional): Additional top-level metadata keys to index as JSON + * path indexes. The default keys are {@code user_id}, {@code agent_id}, {@code run_id}, + * {@code actor_id}, and {@code category}. + * <li>{@code metadata_index_cast_types} (optional): Map from metadata key to Milvus JSON index + * cast type. Defaults to {@code VARCHAR}; use values such as {@code DOUBLE} for numeric + * metadata keys. + * <li>{@code num_shards} (optional): Number of Milvus shards to create with the collection; + * defaults to {@link #DEFAULT_NUM_SHARDS}. As a rough capacity-planning rule, use about one + * shard per 100 million vectors, and increase it for heavier write throughput. + * <li>{@code consistency_level} (optional): Milvus consistency level for query and search; + * defaults to {@code BOUNDED}. Use {@code STRONG} when immediate read-after-write visibility + * is required. + * <li>{@code load_timeout_ms} (optional): Timeout used when loading a collection from {@link + * #createCollectionIfNotExists(String, Map)}; defaults to {@link #DEFAULT_LOAD_TIMEOUT_MS}. + * <li>{@code uri}, or {@code host}/{@code port} (optional): Milvus endpoint. If omitted, defaults + * to {@code http://localhost:19530}. + * <li>Authentication (optional): Either token auth via {@code token}, or basic auth via {@code + * username}/{@code password}. + * </ul> + * + * <p>Example usage: + * + * <pre>{@code + * ResourceDescriptor desc = ResourceDescriptor.Builder + * .newBuilder(MilvusVectorStore.class.getName()) + * .addInitialArgument("embedding_model", "textEmbedder") + * .addInitialArgument("uri", "http://localhost:19530") + * .addInitialArgument("collection", "my_documents") + * .addInitialArgument("dims", 768) + * .addInitialArgument("metric_type", "COSINE") + * .addInitialArgument("index_type", "AUTOINDEX") + * .build(); + * }</pre> + */ +public class MilvusVectorStore extends BaseVectorStore implements CollectionManageableVectorStore { + + /** + * Default collection name used when {@code collection}, {@code collection_name}, and {@code + * index} are omitted. + */ + public static final String DEFAULT_COLLECTION = "flink_agents_milvus_collection"; + /** Default primary key field name. */ + public static final String DEFAULT_ID_FIELD = "id"; + /** Default field name used to store document content. */ + public static final String DEFAULT_CONTENT_FIELD = "content"; + /** Default JSON field name used to store document metadata. */ + public static final String DEFAULT_METADATA_FIELD = "metadata"; + /** Default FloatVector field name on which Milvus search is executed. */ + public static final String DEFAULT_VECTOR_FIELD = "embedding"; + /** Default index name for the full metadata JSON index. */ + public static final String DEFAULT_METADATA_INDEX_NAME = "metadata_json_index"; + /** Metadata keys that are commonly used by Mem0 and vector-store filter callers. */ + public static final List<String> DEFAULT_METADATA_INDEX_KEYS = + List.of("user_id", "agent_id", "run_id", "actor_id", "category"); + /** Default Milvus JSON cast type used for metadata path indexes. */ + public static final String DEFAULT_METADATA_INDEX_CAST_TYPE = "VARCHAR"; + /** Default vector dimensionality used when {@code dims} is not provided. */ + public static final int DEFAULT_DIMENSION = 768; + /** The maximum number of documents that can be retrieved by get when limit is omitted. */ + public static final int DEFAULT_MAX_GET_LIMIT = 10000; + /** Default maximum length for the VarChar primary key field. */ + public static final int DEFAULT_ID_MAX_LENGTH = 65535; + /** Default maximum length for the VarChar content field. */ + public static final int DEFAULT_CONTENT_MAX_LENGTH = 65535; + /** Default number of Milvus shards used when creating a collection. */ + public static final int DEFAULT_NUM_SHARDS = 1; + /** Default timeout for synchronous collection load operations. */ + public static final long DEFAULT_LOAD_TIMEOUT_MS = 120000L; + + /** Milvus connection configuration built from the resource descriptor. */ + private final ConnectConfig connectConfig; + /** Lazily-created Milvus client used to execute collection and vector requests. */ + private transient @Nullable MilvusClientV2 client; + + private final Gson gson = new Gson(); + + /** Resolved Milvus endpoint URI. */ + private final String uri; + /** Optional Milvus database name. */ + private final @Nullable String databaseName; + /** Default collection name used when a per-call collection is not supplied. */ + private final String defaultCollection; + /** Name of the primary key field. */ + private final String idField; + /** Name of the content field to store the document content. */ + private final String contentField; + /** Name of the JSON field to store document metadata. */ + private final String metadataField; + /** Name of the FloatVector field on which vector search is executed. */ + private final String vectorField; + /** Vector dimensionality of the {@link #vectorField}. */ + private final int dims; + /** Default query limit used by get when no limit is provided. */ + private final int maxGetLimit; + /** Maximum length for the VarChar primary key field. */ + private final int idMaxLength; + /** Maximum length for the VarChar content field. */ + private final int contentMaxLength; + /** Default Milvus metric type used for collection indexes and search. */ + private final IndexParam.MetricType metricType; + /** Default Milvus index type used when creating collections. */ + private final IndexParam.IndexType indexType; + /** Extra index parameters passed to Milvus collection creation. */ + private final Map<String, Object> indexParams; + /** Metadata JSON keys indexed with path-specific indexes during collection creation. */ + private final List<String> metadataIndexKeys; + /** Per-metadata-key JSON cast type overrides for path-specific indexes. */ + private final Map<String, String> metadataIndexCastTypes; + /** Number of shards used when creating collections. */ + private final int numShards; + /** Consistency level used for collection creation, query, and search requests. */ + private final ConsistencyLevel consistencyLevel; + /** Timeout used when loading collections from create-collection paths. */ + private final long loadTimeoutMs; + /** + * Creates a new {@code MilvusVectorStore} from the provided descriptor and resource resolver. + * + * <p>The constructor reads connection, authentication, schema, index, and query defaults from + * the descriptor and prepares a {@link ConnectConfig}. The Milvus client itself is created + * lazily on first use. + * + * @param descriptor Resource descriptor containing configuration arguments + * @param resourceContext Context used to resolve other resources by name and type + */ + public MilvusVectorStore(ResourceDescriptor descriptor, ResourceContext resourceContext) { + super(descriptor, resourceContext); + + this.uri = resolveUri(descriptor); + this.databaseName = stringArg(descriptor, "db_name", null); + this.defaultCollection = + stringArg( + descriptor, + "collection", + stringArg( + descriptor, + "collection_name", + stringArg(descriptor, "index", DEFAULT_COLLECTION))); + this.idField = stringArg(descriptor, "id_field", DEFAULT_ID_FIELD); + this.contentField = stringArg(descriptor, "content_field", DEFAULT_CONTENT_FIELD); + this.metadataField = stringArg(descriptor, "metadata_field", DEFAULT_METADATA_FIELD); + this.vectorField = stringArg(descriptor, "vector_field", DEFAULT_VECTOR_FIELD); + this.dims = intArg(descriptor, "dims", DEFAULT_DIMENSION); + this.maxGetLimit = intArg(descriptor, "max_get_limit", DEFAULT_MAX_GET_LIMIT); + this.idMaxLength = intArg(descriptor, "id_max_length", DEFAULT_ID_MAX_LENGTH); + this.contentMaxLength = + intArg(descriptor, "content_max_length", DEFAULT_CONTENT_MAX_LENGTH); + this.metricType = + enumArg( + IndexParam.MetricType.class, + stringArg(descriptor, "metric_type", IndexParam.MetricType.COSINE.name())); + this.indexType = + enumArg( + IndexParam.IndexType.class, + stringArg(descriptor, "index_type", IndexParam.IndexType.AUTOINDEX.name())); + this.indexParams = mapArg(descriptor, "index_params"); + this.metadataIndexCastTypes = metadataIndexCastTypesArg(descriptor); + this.metadataIndexKeys = metadataIndexKeysArg(descriptor, this.metadataIndexCastTypes); + this.numShards = intArg(descriptor, "num_shards", DEFAULT_NUM_SHARDS); + this.consistencyLevel = + enumArg( + ConsistencyLevel.class, + stringArg( + descriptor, "consistency_level", ConsistencyLevel.BOUNDED.name())); + this.loadTimeoutMs = longArg(descriptor, "load_timeout_ms", DEFAULT_LOAD_TIMEOUT_MS); + + ConnectConfig.ConnectConfigBuilder builder = + ConnectConfig.builder() + .uri(this.uri) + .secure(this.uri.startsWith("https://")) + .enablePrecheck(booleanArg(descriptor, "enable_precheck", false)); + + String token = stringArg(descriptor, "token", null); + if (token != null && !token.isEmpty()) { + builder.token(token); + } + String username = stringArg(descriptor, "username", null); + String password = stringArg(descriptor, "password", null); + if (username != null && password != null) { + builder.username(username).password(password); + } + if (this.databaseName != null) { + builder.dbName(this.databaseName); + } + + this.connectConfig = builder.build(); + } + + @Override + public void close() { + if (this.client != null) { + this.client.close(); + this.client = null; + } + } + + /** + * Returns default store-level arguments collected from the descriptor. + * + * <p>The returned map can be merged with per-query arguments to form the complete set of + * parameters for Milvus collection creation, retrieval, and vector search operations. + * + * @return map of default store arguments such as {@code uri}, {@code collection}, {@code + * vector_field}, {@code dims}, {@code metric_type}, {@code index_type}, and {@code + * num_shards}. + */ + @Override + public Map<String, Object> getStoreKwargs() { + Map<String, Object> kwargs = new HashMap<>(); + kwargs.put("uri", this.uri); + kwargs.put("collection", this.defaultCollection); + kwargs.put("index", this.defaultCollection); + kwargs.put("id_field", this.idField); + kwargs.put("content_field", this.contentField); + kwargs.put("metadata_field", this.metadataField); + kwargs.put("vector_field", this.vectorField); + kwargs.put("dims", this.dims); + kwargs.put("metric_type", this.metricType.name()); + kwargs.put("index_type", this.indexType.name()); + kwargs.put("index_params", new HashMap<>(this.indexParams)); + kwargs.put("metadata_index_keys", new ArrayList<>(this.metadataIndexKeys)); + kwargs.put("metadata_index_cast_types", new HashMap<>(this.metadataIndexCastTypes)); + kwargs.put("num_shards", this.numShards); + kwargs.put("consistency_level", this.consistencyLevel.name()); + kwargs.put("load_timeout_ms", this.loadTimeoutMs); + if (this.databaseName != null) { + kwargs.put("db_name", this.databaseName); + } + return kwargs; + } + + /** Returns the lazily-created Milvus client. */ + private MilvusClientV2 client() { Review Comment: Since flink-agents supports asynchronous execution of actions, meaning that multiple threads can invoke the same vector store simultaneously, connection leaks caused by multi-threading may occur here. I suggest adding DCL. ########## tools/docker/elasticsearch/docker-compose.yml: ########## Review Comment: It appears that this file is not being used in the CI. I'm wondering if we could also align the current Elasticsearch startup process in the CI with that of Milvus. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
