This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit ea6db461399164cded131fc80a28af187f6a5aa0 Author: youjin <[email protected]> AuthorDate: Wed Jan 14 09:30:29 2026 +0800 [Feature][runtime] Support the use of Python VectorStore in Java --- .../api/resource/python/PythonResourceAdapter.java | 45 +++ .../api/vectorstores/VectorStoreQueryMode.java | 32 +- .../PythonCollectionManageableVectorStore.java | 85 +++++ .../api/vectorstores/python/PythonVectorStore.java | 156 +++++++++ .../PythonCollectionManageableVectorStoreTest.java | 354 +++++++++++++++++++++ .../pom.xml | 5 + .../test/VectorStoreCrossLanguageAgent.java | 239 ++++++++++++++ .../test/VectorStoreCrossLanguageTest.java | 102 ++++++ .../resourceprovider/PythonResourceProvider.java | 4 +- .../apache/flink/agents/plan/AgentPlanTest.java | 31 ++ python/flink_agents/runtime/python_java_utils.py | 35 ++ .../python/utils/PythonResourceAdapterImpl.java | 57 ++++ 12 files changed, 1141 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java index 89652ae..43f2fbe 100644 --- a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java +++ b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java @@ -20,8 +20,13 @@ package org.apache.flink.agents.api.resource.python; import org.apache.flink.agents.api.chat.messages.ChatMessage; import org.apache.flink.agents.api.tools.Tool; +import org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.flink.agents.api.vectorstores.VectorStoreQuery; +import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult; import pemja.core.object.PyObject; +import java.util.List; import java.util.Map; /** @@ -66,6 +71,46 @@ public interface PythonResourceAdapter { */ ChatMessage fromPythonChatMessage(Object pythonChatMessage); + /** + * Converts a list of java document object to its Python equivalent. + * + * @param documents the list of java document to convert + * @return the Python representation of the documents + */ + Object toPythonDocuments(List<Document> documents); + + /** + * Converts List of Python Document object back to a list of Java Document. + * + * @param pythonDocuments the List of Python Document object to convert + * @return the list of Java Document representation + */ + List<Document> fromPythonDocuments(List<PyObject> pythonDocuments); + + /** + * Converts a Java VectorStoreQuery object to its Python equivalent. + * + * @param query the Java VectorStoreQuery to convert + * @return the Python representation of the vector store query + */ + Object toPythonVectorStoreQuery(VectorStoreQuery query); + + /** + * Converts a Python VectorStoreQuery object back to a Java VectorStoreQuery. + * + * @param pythonVectorStoreQueryResult the Python VectorStoreQuery object to convert + * @return the Java VectorStoreQuery representation + */ + VectorStoreQueryResult fromPythonVectorStoreQueryResult(PyObject pythonVectorStoreQueryResult); + + /** + * Converts a Python Collection object back to a Java Collection. + * + * @param pythonCollection the Python Collection object to convert + * @return the Java Collection representation + */ + CollectionManageableVectorStore.Collection fromPythonCollection(PyObject pythonCollection); + /** * Converts a Java Tool object to its Python equivalent. * diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java index 07cd35e..8e06200 100644 --- a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java @@ -27,9 +27,35 @@ package org.apache.flink.agents.api.vectorstores; */ public enum VectorStoreQueryMode { /** Semantic similarity search using embeddings. */ - SEMANTIC, + SEMANTIC("semantic"); /** Keyword/lexical search (store dependent). TODO: term-based retrieval */ - // KEYWORD, + // KEYWORD("keyword"), /** Hybrid search combining semantic and keyword results. TODO: semantic + keyword retrieval */ - // HYBRID; + // HYBRID("hybrid"); + + private final String value; + + VectorStoreQueryMode(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + /** + * Get VectorStoreQueryMode from string value. + * + * @param value the string value + * @return the corresponding ResourceType + * @throws IllegalArgumentException if no matching ResourceType is found + */ + public static VectorStoreQueryMode fromValue(String value) { + for (VectorStoreQueryMode type : VectorStoreQueryMode.values()) { + if (type.value.equals(value)) { + return type; + } + } + throw new IllegalArgumentException("Unknown VectorStoreQueryMode value: " + value); + } } diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStore.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStore.java new file mode 100644 index 0000000..f58d108 --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStore.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores.python; + +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.resource.python.PythonResourceAdapter; +import org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore; +import pemja.core.object.PyObject; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * Python-based implementation of VectorStore with collection management capabilities that bridges + * Java and Python vector store functionality. This class wraps a Python vector store object and + * provides Java interface compatibility while delegating actual storage, retrieval, and collection + * management operations to the underlying Python implementation. + * + * <p>Unlike {@link PythonVectorStore}, this implementation provides additional collection + * management features, allowing for operations such as creating, listing, and deleting collections + * within the vector store. + */ +public class PythonCollectionManageableVectorStore extends PythonVectorStore + implements CollectionManageableVectorStore { + /** + * Creates a new PythonEmbeddingModelConnection. + * + * @param adapter The Python resource adapter (required by PythonResourceProvider's + * reflection-based instantiation but not used directly in this implementation) + * @param vectorStore The Python vector store object + * @param descriptor The resource descriptor + * @param getResource Function to retrieve resources by name and type + */ + public PythonCollectionManageableVectorStore( + PythonResourceAdapter adapter, + PyObject vectorStore, + ResourceDescriptor descriptor, + BiFunction<String, ResourceType, Resource> getResource) { + super(adapter, vectorStore, descriptor, getResource); + } + + @Override + public Collection getOrCreateCollection(String name, Map<String, Object> metadata) + throws Exception { + Map<String, Object> kwargs = new HashMap<>(); + kwargs.put("name", name); + if (metadata != null && !metadata.isEmpty()) { + kwargs.put("metadata", metadata); + } + + Object result = this.adapter.callMethod(vectorStore, "get_or_create_collection", kwargs); + return adapter.fromPythonCollection((PyObject) result); + } + + @Override + public Collection getCollection(String name) throws Exception { + Object result = this.vectorStore.invokeMethod("get_collection", name); + return adapter.fromPythonCollection((PyObject) result); + } + + @Override + public Collection deleteCollection(String name) throws Exception { + Object result = this.vectorStore.invokeMethod("delete_collection", name); + return adapter.fromPythonCollection((PyObject) result); + } +} diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java new file mode 100644 index 0000000..c570ad5 --- /dev/null +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores.python; + +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.resource.python.PythonResourceAdapter; +import org.apache.flink.agents.api.resource.python.PythonResourceWrapper; +import org.apache.flink.agents.api.vectorstores.BaseVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.flink.agents.api.vectorstores.VectorStoreQuery; +import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult; +import pemja.core.object.PyObject; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * Python-based implementation of VectorStore that bridges Java and Python vector store + * functionality. This class wraps a Python vector store object and provides Java interface + * compatibility while delegating actual storage and retrieval operations to the underlying Python + * implementation. + * + * <p>This class serves as a connection layer between Java and Python vector store environments, + * enabling seamless integration of Python-based vector stores within Java applications. + */ +public class PythonVectorStore extends BaseVectorStore implements PythonResourceWrapper { + protected final PyObject vectorStore; + protected final PythonResourceAdapter adapter; + + /** + * Creates a new PythonEmbeddingModelConnection. + * + * @param adapter The Python resource adapter (required by PythonResourceProvider's + * reflection-based instantiation but not used directly in this implementation) + * @param vectorStore The Python vector store object + * @param descriptor The resource descriptor + * @param getResource Function to retrieve resources by name and type + */ + public PythonVectorStore( + PythonResourceAdapter adapter, + PyObject vectorStore, + ResourceDescriptor descriptor, + BiFunction<String, ResourceType, Resource> getResource) { + super(descriptor, getResource); + this.vectorStore = vectorStore; + this.adapter = adapter; + } + + @Override + @SuppressWarnings("unchecked") + public List<String> add( + List<Document> documents, @Nullable String collection, Map<String, Object> extraArgs) + throws IOException { + Object pythonDocuments = adapter.toPythonDocuments(documents); + + Map<String, Object> kwargs = new HashMap<>(extraArgs); + kwargs.put("documents", pythonDocuments); + + if (collection != null) { + kwargs.put("collection", collection); + } + + return (List<String>) adapter.callMethod(vectorStore, "add", kwargs); + } + + @Override + public VectorStoreQueryResult query(VectorStoreQuery query) { + Object pythonQuery = adapter.toPythonVectorStoreQuery(query); + + PyObject pythonResult = (PyObject) vectorStore.invokeMethod("query", pythonQuery); + + return adapter.fromPythonVectorStoreQueryResult(pythonResult); + } + + @Override + public long size(@Nullable String collection) throws Exception { + return (long) vectorStore.invokeMethod("size", collection); + } + + @Override + public List<Document> get( + @Nullable List<String> ids, @Nullable String collection, Map<String, Object> extraArgs) + throws IOException { + Map<String, Object> kwargs = new HashMap<>(extraArgs); + if (ids != null && !ids.isEmpty()) { + kwargs.put("ids", ids); + } + if (collection != null) { + kwargs.put("collection", collection); + } + + Object pythonDocuments = adapter.callMethod(vectorStore, "get", kwargs); + + return adapter.fromPythonDocuments((List<PyObject>) pythonDocuments); + } + + @Override + public void delete( + @Nullable List<String> ids, @Nullable String collection, Map<String, Object> extraArgs) + throws IOException { + Map<String, Object> kwargs = new HashMap<>(extraArgs); + if (ids != null && !ids.isEmpty()) { + kwargs.put("ids", ids); + } + if (collection != null) { + kwargs.put("collection", collection); + } + adapter.callMethod(vectorStore, "delete", kwargs); + } + + @Override + public Map<String, Object> getStoreKwargs() { + return Map.of(); + } + + @Override + protected List<Document> queryEmbedding( + float[] embedding, int limit, @Nullable String collection, Map<String, Object> args) { + return List.of(); + } + + @Override + protected List<String> addEmbedding( + List<Document> documents, @Nullable String collection, Map<String, Object> extraArgs) + throws IOException { + return List.of(); + } + + @Override + public Object getPythonResource() { + return vectorStore; + } +} diff --git a/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java b/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java new file mode 100644 index 0000000..d92b716 --- /dev/null +++ b/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.api.vectorstores.python; + +import org.apache.flink.agents.api.resource.Resource; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.resource.python.PythonResourceAdapter; +import org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import pemja.core.object.PyObject; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PythonCollectionManageableVectorStoreTest { + @Mock private PythonResourceAdapter mockAdapter; + + @Mock private PyObject mockVectorStore; + + @Mock private ResourceDescriptor mockDescriptor; + + @Mock private BiFunction<String, ResourceType, Resource> mockGetResource; + + @Mock private PyObject mockPythonCollection; + + private PythonCollectionManageableVectorStore vectorStore; + private AutoCloseable mocks; + + @BeforeEach + void setUp() throws Exception { + mocks = MockitoAnnotations.openMocks(this); + vectorStore = + new PythonCollectionManageableVectorStore( + mockAdapter, mockVectorStore, mockDescriptor, mockGetResource); + } + + @AfterEach + void tearDown() throws Exception { + if (mocks != null) { + mocks.close(); + } + } + + @Test + void testConstructor() { + assertThat(vectorStore).isNotNull(); + assertThat(vectorStore.getPythonResource()).isEqualTo(mockVectorStore); + } + + @Test + void testGetPythonResourceWithNullVectorStore() { + PythonCollectionManageableVectorStore storeWithNull = + new PythonCollectionManageableVectorStore( + mockAdapter, null, mockDescriptor, mockGetResource); + + Object result = storeWithNull.getPythonResource(); + + assertThat(result).isNull(); + } + + @Test + void testGetOrCreateCollectionWithMetadata() throws Exception { + String collectionName = "test_collection"; + Map<String, Object> metadata = new HashMap<>(); + metadata.put("key1", "value1"); + metadata.put("key2", "value2"); + + CollectionManageableVectorStore.Collection expectedCollection = + new CollectionManageableVectorStore.Collection(collectionName, metadata); + + when(mockAdapter.callMethod( + eq(mockVectorStore), eq("get_or_create_collection"), any(Map.class))) + .thenReturn(mockPythonCollection); + when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection); + + CollectionManageableVectorStore.Collection result = + vectorStore.getOrCreateCollection(collectionName, metadata); + + assertThat(result).isNotNull(); + assertThat(result.getName()).isEqualTo(collectionName); + assertThat(result.getMetadata()).isEqualTo(metadata); + + verify(mockAdapter) + .callMethod( + eq(mockVectorStore), + eq("get_or_create_collection"), + argThat( + kwargs -> { + assertThat(kwargs).containsKey("name"); + assertThat(kwargs).containsKey("metadata"); + assertThat(kwargs.get("name")).isEqualTo(collectionName); + assertThat(kwargs.get("metadata")).isEqualTo(metadata); + return true; + })); + } + + @Test + void testGetOrCreateCollectionWithEmptyMetadata() throws Exception { + String collectionName = "test_collection"; + Map<String, Object> metadata = new HashMap<>(); + + CollectionManageableVectorStore.Collection expectedCollection = + new CollectionManageableVectorStore.Collection(collectionName, metadata); + + when(mockAdapter.callMethod( + eq(mockVectorStore), eq("get_or_create_collection"), any(Map.class))) + .thenReturn(mockPythonCollection); + when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection); + + CollectionManageableVectorStore.Collection result = + vectorStore.getOrCreateCollection(collectionName, metadata); + + assertThat(result).isNotNull(); + assertThat(result.getName()).isEqualTo(collectionName); + + verify(mockAdapter) + .callMethod( + eq(mockVectorStore), + eq("get_or_create_collection"), + argThat( + kwargs -> { + assertThat(kwargs).containsKey("name"); + assertThat(kwargs).doesNotContainKey("metadata"); + return true; + })); + } + + @Test + void testGetOrCreateCollectionWithNullMetadata() throws Exception { + String collectionName = "test_collection"; + + CollectionManageableVectorStore.Collection expectedCollection = + new CollectionManageableVectorStore.Collection(collectionName, null); + + when(mockAdapter.callMethod( + eq(mockVectorStore), eq("get_or_create_collection"), any(Map.class))) + .thenReturn(mockPythonCollection); + when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection); + + CollectionManageableVectorStore.Collection result = + vectorStore.getOrCreateCollection(collectionName, null); + + assertThat(result).isNotNull(); + assertThat(result.getName()).isEqualTo(collectionName); + + verify(mockAdapter) + .callMethod( + eq(mockVectorStore), + eq("get_or_create_collection"), + argThat( + kwargs -> { + assertThat(kwargs).containsKey("name"); + assertThat(kwargs).doesNotContainKey("metadata"); + return true; + })); + } + + @Test + void testGetCollection() throws Exception { + String collectionName = "existing_collection"; + Map<String, Object> metadata = Map.of("type", "test"); + + CollectionManageableVectorStore.Collection expectedCollection = + new CollectionManageableVectorStore.Collection(collectionName, metadata); + + when(mockVectorStore.invokeMethod("get_collection", collectionName)) + .thenReturn(mockPythonCollection); + when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection); + + CollectionManageableVectorStore.Collection result = + vectorStore.getCollection(collectionName); + + assertThat(result).isNotNull(); + assertThat(result.getName()).isEqualTo(collectionName); + assertThat(result.getMetadata()).isEqualTo(metadata); + + verify(mockVectorStore).invokeMethod("get_collection", collectionName); + verify(mockAdapter).fromPythonCollection(mockPythonCollection); + } + + @Test + void testDeleteCollection() throws Exception { + String collectionName = "collection_to_delete"; + Map<String, Object> metadata = Map.of("status", "deleted"); + + CollectionManageableVectorStore.Collection expectedCollection = + new CollectionManageableVectorStore.Collection(collectionName, metadata); + + when(mockVectorStore.invokeMethod("delete_collection", collectionName)) + .thenReturn(mockPythonCollection); + when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection); + + CollectionManageableVectorStore.Collection result = + vectorStore.deleteCollection(collectionName); + + assertThat(result).isNotNull(); + assertThat(result.getName()).isEqualTo(collectionName); + + verify(mockVectorStore).invokeMethod("delete_collection", collectionName); + verify(mockAdapter).fromPythonCollection(mockPythonCollection); + } + + @Test + void testAddDocuments() throws Exception { + List<Document> documents = + Arrays.asList( + new Document("content1", Map.of("key", "value1"), "doc1"), + new Document("content2", Map.of("key", "value2"), "doc2")); + String collection = "test_collection"; + Map<String, Object> extraArgs = Map.of("batch_size", 10); + + List<String> expectedIds = Arrays.asList("doc1", "doc2"); + + when(mockAdapter.toPythonDocuments(documents)).thenReturn(new Object()); + when(mockAdapter.callMethod(eq(mockVectorStore), eq("add"), any(Map.class))) + .thenReturn(expectedIds); + + List<String> result = vectorStore.add(documents, collection, extraArgs); + + assertThat(result).isNotNull(); + assertThat(result).hasSize(2); + assertThat(result).containsExactly("doc1", "doc2"); + + verify(mockAdapter).toPythonDocuments(documents); + verify(mockAdapter) + .callMethod( + eq(mockVectorStore), + eq("add"), + argThat( + kwargs -> { + assertThat(kwargs).containsKey("documents"); + assertThat(kwargs).containsKey("collection"); + assertThat(kwargs).containsKey("batch_size"); + return true; + })); + } + + @Test + void testGetDocuments() throws Exception { + List<String> ids = Arrays.asList("doc1", "doc2"); + String collection = "test_collection"; + Map<String, Object> extraArgs = new HashMap<>(); + + List<Document> expectedDocuments = + Arrays.asList( + new Document("content1", Map.of(), "doc1"), + new Document("content2", Map.of(), "doc2")); + + when(mockAdapter.callMethod(eq(mockVectorStore), eq("get"), any(Map.class))) + .thenReturn(Arrays.asList(mockPythonCollection, mockPythonCollection)); + when(mockAdapter.fromPythonDocuments(any())).thenReturn(expectedDocuments); + + List<Document> result = vectorStore.get(ids, collection, extraArgs); + + assertThat(result).isNotNull(); + assertThat(result).hasSize(2); + + verify(mockAdapter) + .callMethod( + eq(mockVectorStore), + eq("get"), + argThat( + kwargs -> { + assertThat(kwargs).containsKey("ids"); + assertThat(kwargs).containsKey("collection"); + return true; + })); + } + + @Test + void testDeleteDocuments() throws Exception { + List<String> ids = Arrays.asList("doc1", "doc2"); + String collection = "test_collection"; + Map<String, Object> extraArgs = new HashMap<>(); + + when(mockAdapter.callMethod(eq(mockVectorStore), eq("delete"), any(Map.class))) + .thenReturn(null); + + vectorStore.delete(ids, collection, extraArgs); + + verify(mockAdapter) + .callMethod( + eq(mockVectorStore), + eq("delete"), + argThat( + kwargs -> { + assertThat(kwargs).containsKey("ids"); + assertThat(kwargs).containsKey("collection"); + return true; + })); + } + + @Test + void testSize() throws Exception { + String collection = "test_collection"; + long expectedSize = 100L; + + when(mockVectorStore.invokeMethod("size", collection)).thenReturn(expectedSize); + + long result = vectorStore.size(collection); + + assertThat(result).isEqualTo(expectedSize); + verify(mockVectorStore).invokeMethod("size", collection); + } + + @Test + void testInheritanceFromPythonVectorStore() { + assertThat(vectorStore).isInstanceOf(PythonVectorStore.class); + } + + @Test + void testImplementsCollectionManageableVectorStore() { + assertThat(vectorStore).isInstanceOf(CollectionManageableVectorStore.class); + } + + @Test + void testImplementsPythonResourceWrapper() { + assertThat(vectorStore) + .isInstanceOf( + org.apache.flink.agents.api.resource.python.PythonResourceWrapper.class); + } +} diff --git a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml index f06b302..b11009c 100644 --- a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml +++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml @@ -30,6 +30,11 @@ <artifactId>flink-agents-integrations-chat-models-ollama</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-agents-integrations-embedding-models-ollama</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> diff --git a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java new file mode 100644 index 0000000..6368bd1 --- /dev/null +++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.resource.test; + +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.agents.Agent; +import org.apache.flink.agents.api.annotation.Action; +import org.apache.flink.agents.api.annotation.EmbeddingModelConnection; +import org.apache.flink.agents.api.annotation.EmbeddingModelSetup; +import org.apache.flink.agents.api.annotation.VectorStore; +import org.apache.flink.agents.api.context.MemoryObject; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.embedding.model.python.PythonEmbeddingModelConnection; +import org.apache.flink.agents.api.embedding.model.python.PythonEmbeddingModelSetup; +import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent; +import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.flink.agents.api.vectorstores.python.PythonCollectionManageableVectorStore; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection; +import org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelSetup; +import org.junit.jupiter.api.Assertions; +import pemja.core.PythonException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Integration test agent for verifying vector store functionality with Python vector store + * implementation. + * + * <p>This test agent validates: - Python vector store resource registration and creation - Python + * embedding model integration with vector store - Collection management operations (create, get, + * delete) - Document CRUD operations (add, get, delete, size) - Context retrieval and similarity + * search - Cross-language resource dependency (vector store depends on embedding model) + * + * <p>Used for e2e testing of the vector store subsystem with cross-language support. + */ +public class VectorStoreCrossLanguageAgent extends Agent { + public static final String OLLAMA_MODEL = "nomic-embed-text"; + public static final String TEST_COLLECTION = "test_collection"; + + @EmbeddingModelConnection + public static ResourceDescriptor embeddingConnection() { + if (System.getProperty("EMBEDDING_TYPE", "PYTHON").equals("PYTHON")) { + return ResourceDescriptor.Builder.newBuilder( + PythonEmbeddingModelConnection.class.getName()) + .addInitialArgument( + "module", + "flink_agents.integrations.embedding_models.local.ollama_embedding_model") + .addInitialArgument("clazz", "OllamaEmbeddingModelConnection") + .build(); + } else { + return ResourceDescriptor.Builder.newBuilder( + OllamaEmbeddingModelConnection.class.getName()) + .addInitialArgument("host", "http://localhost:11434") + .addInitialArgument("timeout", 60) + .build(); + } + } + + @EmbeddingModelSetup + public static ResourceDescriptor embeddingModel() { + if (System.getProperty("EMBEDDING_TYPE", "PYTHON").equals("PYTHON")) { + return ResourceDescriptor.Builder.newBuilder(PythonEmbeddingModelSetup.class.getName()) + .addInitialArgument( + "module", + "flink_agents.integrations.embedding_models.local.ollama_embedding_model") + .addInitialArgument("clazz", "OllamaEmbeddingModelSetup") + .addInitialArgument("connection", "embeddingConnection") + .addInitialArgument("model", OLLAMA_MODEL) + .build(); + } else { + return ResourceDescriptor.Builder.newBuilder(OllamaEmbeddingModelSetup.class.getName()) + .addInitialArgument("connection", "embeddingConnection") + .addInitialArgument("model", OLLAMA_MODEL) + .build(); + } + } + + @VectorStore + public static ResourceDescriptor vectorStore() { + return ResourceDescriptor.Builder.newBuilder( + PythonCollectionManageableVectorStore.class.getName()) + .addInitialArgument( + "module", + "flink_agents.integrations.vector_stores.chroma.chroma_vector_store") + .addInitialArgument("clazz", "ChromaVectorStore") + .addInitialArgument("embedding_model", "embeddingModel") + .build(); + } + + @Action(listenEvents = InputEvent.class) + public static void inputEvent(InputEvent event, RunnerContext ctx) throws Exception { + final String input = (String) event.getInput(); + + MemoryObject isInitialized = ctx.getShortTermMemory().get("is_initialized"); + if (isInitialized == null) { + PythonCollectionManageableVectorStore vectorStore = + (PythonCollectionManageableVectorStore) + ctx.getResource("vectorStore", ResourceType.VECTOR_STORE); + + // Initialize vector store + vectorStore.getOrCreateCollection( + TEST_COLLECTION, Map.of("key1", "value1", "key2", "value2")); + + CollectionManageableVectorStore.Collection collection = + vectorStore.getCollection(TEST_COLLECTION); + Assertions.assertNotEquals(collection, null, "Vector store collection is null"); + Assertions.assertEquals( + TEST_COLLECTION, + collection.getName(), + "Vector store collection name is not test_collection"); + Assertions.assertEquals( + Map.of("key1", "value1", "key2", "value2"), + collection.getMetadata(), + "Vector store collection metadata is not correct"); + + System.out.println("[TEST] Vector store Collection Management PASSED"); + + vectorStore.deleteCollection(TEST_COLLECTION); + Assertions.assertThrows( + PythonException.class, () -> vectorStore.getCollection(TEST_COLLECTION)); + + // Initialize collection + vectorStore.add( + List.of( + new Document( + "Apache Flink Agents is an Agentic AI framework based on Apache Flink.", + Map.of("category", "ai-agent", "source", "test"), + "doc1"), + new Document( + "ChromaDB is a vector database for AI applications", + Map.of("category", "database", "source", "test"), + "doc2"), + new Document( + "This is a test document used to verify the delete functionality.", + Map.of("category", "utility", "source", "test"), + "doc3")), + null, + Map.of()); + + // Test size + Assertions.assertEquals(3, vectorStore.size(null), "Vector store size is not 3"); + + // Test delete + vectorStore.delete(List.of("doc3"), null, Map.of()); + Assertions.assertEquals( + 2, vectorStore.size(null), "Vector store size is not 2, doc3 was not deleted"); + + // Test get + Document doc = vectorStore.get(List.of("doc2"), null, Map.of()).get(0); + Assertions.assertEquals( + "ChromaDB is a vector database for AI applications", doc.getContent()); + Assertions.assertEquals( + Map.of("category", "database", "source", "test"), doc.getMetadata()); + + System.out.println("[TEST] Vector store Document Management PASSED"); + + ctx.getShortTermMemory().set("is_initialized", true); + } + + ctx.sendEvent(new ContextRetrievalRequestEvent(input, "vectorStore")); + } + + @Action(listenEvents = ContextRetrievalResponseEvent.class) + public static void contextRetrievalResponseEvent( + ContextRetrievalResponseEvent event, RunnerContext ctx) { + final List<Document> documents = event.getDocuments(); + + Map<String, Object> result = new HashMap<>(); + try { + // Basic validations similar in spirit to EmbeddingIntegrationAgent + if (documents == null) { + throw new AssertionError("Vector store returned null documents list"); + } + + if (documents.isEmpty()) { + throw new AssertionError("Vector store returned empty documents list"); + } + + int idx = 0; + for (Document doc : documents) { + if (doc == null) { + throw new AssertionError("Document entry is null"); + } + + final String content = doc.getContent(); + if (content == null || content.trim().isEmpty()) { + throw new AssertionError(String.format("Document[%d] content is empty", idx)); + } + + // ID can be optional, but when present it must be non-empty + if (doc.getId() != null && doc.getId().trim().isEmpty()) { + throw new AssertionError(String.format("Document[%d] id is empty string", idx)); + } + idx++; + } + + result.put("test_status", "PASSED"); + result.put("retrieved_count", documents.size()); + // Include preview of first doc + Document first = documents.get(0); + result.put("first_doc_id", first.getId()); + result.put( + "first_doc_preview", + first.getContent().substring(0, Math.min(50, first.getContent().length()))); + + ctx.sendEvent(new OutputEvent(result)); + System.out.printf("[TEST] Vector store retrieval PASSED, count=%d%n", documents.size()); + } catch (Exception e) { + result.put("test_status", "FAILED"); + result.put("error", e.getMessage()); + ctx.sendEvent(new OutputEvent(result)); + System.err.printf("[TEST] Vector store retrieval FAILED: %s%n", e.getMessage()); + throw e; + } + } +} diff --git a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageTest.java b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageTest.java new file mode 100644 index 0000000..0d53d49 --- /dev/null +++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.resource.test; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.Map; + +import static org.apache.flink.agents.resource.test.OllamaPreparationUtils.pullModel; +import static org.apache.flink.agents.resource.test.VectorStoreCrossLanguageAgent.OLLAMA_MODEL; + +public class VectorStoreCrossLanguageTest { + + private final boolean ollamaReady; + + public VectorStoreCrossLanguageTest() throws IOException { + ollamaReady = pullModel(OLLAMA_MODEL); + } + + @ParameterizedTest + @ValueSource(strings = {"JAVA", "PYTHON"}) + public void testVectorStoreIntegration(String embeddingType) throws Exception { + System.setProperty("EMBEDDING_TYPE", embeddingType); + Assumptions.assumeTrue(ollamaReady, "Ollama Server information is not provided"); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + final DataStreamSource<String> inputStream = env.fromData("What is Apache Flink"); + + final AgentsExecutionEnvironment agentEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + + final DataStream<Object> outputStream = + agentEnv.fromDataStream( + inputStream, (KeySelector<String, String>) value -> "orderKey") + .apply(new VectorStoreCrossLanguageAgent()) + .toDataStream(); + + final CloseableIterator<Object> results = outputStream.collectAsync(); + + agentEnv.execute(); + + checkResult(results); + } + + @SuppressWarnings("unchecked") + private void checkResult(CloseableIterator<Object> results) { + Assertions.assertTrue( + results.hasNext(), "No output received from VectorStoreIntegrationAgent"); + + Object obj = results.next(); + Assertions.assertInstanceOf(Map.class, obj, "Output must be a Map"); + + java.util.Map<String, Object> res = (java.util.Map<String, Object>) obj; + Assertions.assertEquals("PASSED", res.get("test_status")); + + Object count = res.get("retrieved_count"); + Assertions.assertNotNull(count, "retrieved_count must exist"); + if (count instanceof Number) { + Assertions.assertTrue(((Number) count).intValue() >= 1, "retrieved_count must be >= 1"); + } + + Object preview = res.get("first_doc_preview"); + Assertions.assertTrue( + preview instanceof String && !((String) preview).trim().isEmpty(), + "first_doc_preview must be a non-empty string"); + + Object firstId = res.get("first_doc_id"); + if (firstId != null) { + Assertions.assertTrue( + firstId instanceof String && !((String) firstId).trim().isEmpty(), + "first_doc_id when present must be a non-empty string"); + } + } +} diff --git a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java index 9f77676..5e9a0cc 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java @@ -26,6 +26,7 @@ import org.apache.flink.agents.api.resource.Resource; import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceType; import org.apache.flink.agents.api.resource.python.PythonResourceAdapter; +import org.apache.flink.agents.api.vectorstores.python.PythonCollectionManageableVectorStore; import pemja.core.object.PyObject; import java.lang.reflect.Constructor; @@ -50,7 +51,8 @@ public class PythonResourceProvider extends ResourceProvider { ResourceType.CHAT_MODEL, PythonChatModelSetup.class, ResourceType.CHAT_MODEL_CONNECTION, PythonChatModelConnection.class, ResourceType.EMBEDDING_MODEL, PythonEmbeddingModelSetup.class, - ResourceType.EMBEDDING_MODEL_CONNECTION, PythonEmbeddingModelConnection.class); + ResourceType.EMBEDDING_MODEL_CONNECTION, PythonEmbeddingModelConnection.class, + ResourceType.VECTOR_STORE, PythonCollectionManageableVectorStore.class); protected PythonResourceAdapter pythonResourceAdapter; diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java index ae95f40..7a9ad41 100644 --- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java +++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java @@ -33,6 +33,10 @@ import org.apache.flink.agents.api.resource.ResourceType; import org.apache.flink.agents.api.resource.SerializableResource; import org.apache.flink.agents.api.resource.python.PythonResourceAdapter; import org.apache.flink.agents.api.resource.python.PythonResourceWrapper; +import org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.flink.agents.api.vectorstores.VectorStoreQuery; +import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult; import org.apache.flink.agents.plan.actions.Action; import org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider; import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider; @@ -198,6 +202,33 @@ public class AgentPlanTest { return null; } + @Override + public Object toPythonDocuments(List<Document> documents) { + return null; + } + + @Override + public List<Document> fromPythonDocuments(List<PyObject> pythonDocuments) { + return List.of(); + } + + @Override + public Object toPythonVectorStoreQuery(VectorStoreQuery query) { + return null; + } + + @Override + public VectorStoreQueryResult fromPythonVectorStoreQueryResult( + PyObject pythonVectorStoreQueryResult) { + return null; + } + + @Override + public CollectionManageableVectorStore.Collection fromPythonCollection( + PyObject pythonCollection) { + return null; + } + @Override public Object convertToPythonTool(org.apache.flink.agents.api.tools.Tool tool) { return null; diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index 6a3a84e..de223dd 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -26,6 +26,11 @@ from flink_agents.api.events.event import InputEvent from flink_agents.api.resource import Resource, ResourceType, get_resource_class from flink_agents.api.tools.tool import ToolMetadata from flink_agents.api.tools.utils import create_model_from_java_tool_schema_str +from flink_agents.api.vector_stores.vector_store import ( + Document, + VectorStoreQuery, + VectorStoreQueryMode, +) from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING from flink_agents.runtime.java.java_resource_wrapper import ( JavaGetResourceWrapper, @@ -183,6 +188,36 @@ def update_java_chat_message(chat_message: ChatMessage, j_chat_message: Any) -> return chat_message.role.value +def from_java_document(j_document: Any) -> Document: + """Convert a Java documents to a Python document.""" + document = Document( + content=j_document.getContent(), + id=j_document.getId(), + metadata=j_document.getMetadata(), + ) + if j_document.getEmbedding(): + document.embedding = list(j_document.getEmbedding()) + return document + +def update_java_document(document: Document, j_document: Any) -> None: + """Update a Java document using Python document.""" + j_document.setContent(document.content) + j_document.setId(document.id) + j_document.setMetadata(document.metadata) + if document.embedding: + j_document.setEmbedding(tuple(document.embedding)) + + +def from_java_vector_store_query(j_query: Any) -> VectorStoreQuery: + """Convert a Java vector store query to a Python query.""" + return VectorStoreQuery( + mode=VectorStoreQueryMode(j_query.getMode().getValue()), + query_text=j_query.getQueryText(), + limit=j_query.getLimit(), + collection_name=j_query.getCollection(), + extra_args=j_query.getExtraArgs() + ) + def call_method(obj: Any, method_name: str, kwargs: Dict[str, Any]) -> Any: """Calls a method on `obj` by name and passes in positional and keyword arguments. diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java index b167e19..6c2c875 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java @@ -25,10 +25,16 @@ import org.apache.flink.agents.api.resource.ResourceType; import org.apache.flink.agents.api.resource.python.PythonResourceAdapter; import org.apache.flink.agents.api.resource.python.PythonResourceWrapper; import org.apache.flink.agents.api.tools.Tool; +import org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore; +import org.apache.flink.agents.api.vectorstores.Document; +import org.apache.flink.agents.api.vectorstores.VectorStoreQuery; +import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult; import pemja.core.PythonInterpreter; import pemja.core.object.PyObject; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -60,6 +66,13 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { static final String TO_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + "update_java_chat_message"; + static final String FROM_JAVA_DOCUMENT = PYTHON_MODULE_PREFIX + "from_java_document"; + + static final String UPDATE_JAVA_DOCUMENT = PYTHON_MODULE_PREFIX + "update_java_document"; + + static final String FROM_JAVA_VECTOR_STORE_QUERY = + PYTHON_MODULE_PREFIX + "from_java_vector_store_query"; + private final BiFunction<String, ResourceType, Resource> getResource; private final PythonInterpreter interpreter; private final JavaResourceAdapter javaResourceAdapter; @@ -125,6 +138,50 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { return chatMessage; } + @Override + public Object toPythonDocuments(List<Document> documents) { + List<Object> pythonDocuments = new ArrayList<>(); + for (Document document : documents) { + pythonDocuments.add(interpreter.invoke(FROM_JAVA_DOCUMENT, document)); + } + return pythonDocuments; + } + + @Override + public List<Document> fromPythonDocuments(List<PyObject> pythonDocuments) { + List<Document> documents = new ArrayList<>(); + for (PyObject pythonDocument : pythonDocuments) { + Document document = + new Document( + pythonDocument.getAttr("content").toString(), + (Map<String, Object>) pythonDocument.getAttr("metadata", Map.class), + pythonDocument.getAttr("id").toString()); + documents.add(document); + } + return documents; + } + + @Override + public Object toPythonVectorStoreQuery(VectorStoreQuery query) { + return interpreter.invoke(FROM_JAVA_VECTOR_STORE_QUERY, query); + } + + @Override + public VectorStoreQueryResult fromPythonVectorStoreQueryResult( + PyObject pythonVectorStoreQueryResult) { + List<PyObject> pythonDocuments = + (List<PyObject>) pythonVectorStoreQueryResult.getAttr("documents", List.class); + return new VectorStoreQueryResult(fromPythonDocuments(pythonDocuments)); + } + + @Override + public CollectionManageableVectorStore.Collection fromPythonCollection( + PyObject pythonCollection) { + return new CollectionManageableVectorStore.Collection( + pythonCollection.getAttr("name").toString(), + (Map<String, Object>) pythonCollection.getAttr("metadata", Map.class)); + } + @Override public Object convertToPythonTool(Tool tool) { return interpreter.invoke(FROM_JAVA_TOOL, tool);
