This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit ea6db461399164cded131fc80a28af187f6a5aa0
Author: youjin <[email protected]>
AuthorDate: Wed Jan 14 09:30:29 2026 +0800

    [Feature][runtime] Support the use of Python VectorStore in Java
---
 .../api/resource/python/PythonResourceAdapter.java |  45 +++
 .../api/vectorstores/VectorStoreQueryMode.java     |  32 +-
 .../PythonCollectionManageableVectorStore.java     |  85 +++++
 .../api/vectorstores/python/PythonVectorStore.java | 156 +++++++++
 .../PythonCollectionManageableVectorStoreTest.java | 354 +++++++++++++++++++++
 .../pom.xml                                        |   5 +
 .../test/VectorStoreCrossLanguageAgent.java        | 239 ++++++++++++++
 .../test/VectorStoreCrossLanguageTest.java         | 102 ++++++
 .../resourceprovider/PythonResourceProvider.java   |   4 +-
 .../apache/flink/agents/plan/AgentPlanTest.java    |  31 ++
 python/flink_agents/runtime/python_java_utils.py   |  35 ++
 .../python/utils/PythonResourceAdapterImpl.java    |  57 ++++
 12 files changed, 1141 insertions(+), 4 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
 
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
index 89652ae..43f2fbe 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
@@ -20,8 +20,13 @@ package org.apache.flink.agents.api.resource.python;
 
 import org.apache.flink.agents.api.chat.messages.ChatMessage;
 import org.apache.flink.agents.api.tools.Tool;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
 import pemja.core.object.PyObject;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -66,6 +71,46 @@ public interface PythonResourceAdapter {
      */
     ChatMessage fromPythonChatMessage(Object pythonChatMessage);
 
+    /**
+     * Converts a list of java document object to its Python equivalent.
+     *
+     * @param documents the list of java document to convert
+     * @return the Python representation of the documents
+     */
+    Object toPythonDocuments(List<Document> documents);
+
+    /**
+     * Converts List of Python Document object back to a list of Java Document.
+     *
+     * @param pythonDocuments the List of Python Document object to convert
+     * @return the list of Java Document representation
+     */
+    List<Document> fromPythonDocuments(List<PyObject> pythonDocuments);
+
+    /**
+     * Converts a Java VectorStoreQuery object to its Python equivalent.
+     *
+     * @param query the Java VectorStoreQuery to convert
+     * @return the Python representation of the vector store query
+     */
+    Object toPythonVectorStoreQuery(VectorStoreQuery query);
+
+    /**
+     * Converts a Python VectorStoreQuery object back to a Java 
VectorStoreQuery.
+     *
+     * @param pythonVectorStoreQueryResult the Python VectorStoreQuery object 
to convert
+     * @return the Java VectorStoreQuery representation
+     */
+    VectorStoreQueryResult fromPythonVectorStoreQueryResult(PyObject 
pythonVectorStoreQueryResult);
+
+    /**
+     * Converts a Python Collection object back to a Java Collection.
+     *
+     * @param pythonCollection the Python Collection object to convert
+     * @return the Java Collection representation
+     */
+    CollectionManageableVectorStore.Collection fromPythonCollection(PyObject 
pythonCollection);
+
     /**
      * Converts a Java Tool object to its Python equivalent.
      *
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java
 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java
index 07cd35e..8e06200 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/VectorStoreQueryMode.java
@@ -27,9 +27,35 @@ package org.apache.flink.agents.api.vectorstores;
  */
 public enum VectorStoreQueryMode {
     /** Semantic similarity search using embeddings. */
-    SEMANTIC,
+    SEMANTIC("semantic");
     /** Keyword/lexical search (store dependent). TODO: term-based retrieval */
-    //    KEYWORD,
+    //    KEYWORD("keyword"),
     /** Hybrid search combining semantic and keyword results. TODO: semantic + 
keyword retrieval */
-    //    HYBRID;
+    //    HYBRID("hybrid");
+
+    private final String value;
+
+    VectorStoreQueryMode(String value) {
+        this.value = value;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    /**
+     * Get VectorStoreQueryMode from string value.
+     *
+     * @param value the string value
+     * @return the corresponding ResourceType
+     * @throws IllegalArgumentException if no matching ResourceType is found
+     */
+    public static VectorStoreQueryMode fromValue(String value) {
+        for (VectorStoreQueryMode type : VectorStoreQueryMode.values()) {
+            if (type.value.equals(value)) {
+                return type;
+            }
+        }
+        throw new IllegalArgumentException("Unknown VectorStoreQueryMode 
value: " + value);
+    }
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStore.java
 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStore.java
new file mode 100644
index 0000000..f58d108
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStore.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.vectorstores.python;
+
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import pemja.core.object.PyObject;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * Python-based implementation of VectorStore with collection management 
capabilities that bridges
+ * Java and Python vector store functionality. This class wraps a Python 
vector store object and
+ * provides Java interface compatibility while delegating actual storage, 
retrieval, and collection
+ * management operations to the underlying Python implementation.
+ *
+ * <p>Unlike {@link PythonVectorStore}, this implementation provides 
additional collection
+ * management features, allowing for operations such as creating, listing, and 
deleting collections
+ * within the vector store.
+ */
+public class PythonCollectionManageableVectorStore extends PythonVectorStore
+        implements CollectionManageableVectorStore {
+    /**
+     * Creates a new PythonEmbeddingModelConnection.
+     *
+     * @param adapter The Python resource adapter (required by 
PythonResourceProvider's
+     *     reflection-based instantiation but not used directly in this 
implementation)
+     * @param vectorStore The Python vector store object
+     * @param descriptor The resource descriptor
+     * @param getResource Function to retrieve resources by name and type
+     */
+    public PythonCollectionManageableVectorStore(
+            PythonResourceAdapter adapter,
+            PyObject vectorStore,
+            ResourceDescriptor descriptor,
+            BiFunction<String, ResourceType, Resource> getResource) {
+        super(adapter, vectorStore, descriptor, getResource);
+    }
+
+    @Override
+    public Collection getOrCreateCollection(String name, Map<String, Object> 
metadata)
+            throws Exception {
+        Map<String, Object> kwargs = new HashMap<>();
+        kwargs.put("name", name);
+        if (metadata != null && !metadata.isEmpty()) {
+            kwargs.put("metadata", metadata);
+        }
+
+        Object result = this.adapter.callMethod(vectorStore, 
"get_or_create_collection", kwargs);
+        return adapter.fromPythonCollection((PyObject) result);
+    }
+
+    @Override
+    public Collection getCollection(String name) throws Exception {
+        Object result = this.vectorStore.invokeMethod("get_collection", name);
+        return adapter.fromPythonCollection((PyObject) result);
+    }
+
+    @Override
+    public Collection deleteCollection(String name) throws Exception {
+        Object result = this.vectorStore.invokeMethod("delete_collection", 
name);
+        return adapter.fromPythonCollection((PyObject) result);
+    }
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java
 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java
new file mode 100644
index 0000000..c570ad5
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.vectorstores.python;
+
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
+import pemja.core.object.PyObject;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+/**
+ * Python-based implementation of VectorStore that bridges Java and Python 
vector store
+ * functionality. This class wraps a Python vector store object and provides 
Java interface
+ * compatibility while delegating actual storage and retrieval operations to 
the underlying Python
+ * implementation.
+ *
+ * <p>This class serves as a connection layer between Java and Python vector 
store environments,
+ * enabling seamless integration of Python-based vector stores within Java 
applications.
+ */
+public class PythonVectorStore extends BaseVectorStore implements 
PythonResourceWrapper {
+    protected final PyObject vectorStore;
+    protected final PythonResourceAdapter adapter;
+
+    /**
+     * Creates a new PythonEmbeddingModelConnection.
+     *
+     * @param adapter The Python resource adapter (required by 
PythonResourceProvider's
+     *     reflection-based instantiation but not used directly in this 
implementation)
+     * @param vectorStore The Python vector store object
+     * @param descriptor The resource descriptor
+     * @param getResource Function to retrieve resources by name and type
+     */
+    public PythonVectorStore(
+            PythonResourceAdapter adapter,
+            PyObject vectorStore,
+            ResourceDescriptor descriptor,
+            BiFunction<String, ResourceType, Resource> getResource) {
+        super(descriptor, getResource);
+        this.vectorStore = vectorStore;
+        this.adapter = adapter;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<String> add(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        Object pythonDocuments = adapter.toPythonDocuments(documents);
+
+        Map<String, Object> kwargs = new HashMap<>(extraArgs);
+        kwargs.put("documents", pythonDocuments);
+
+        if (collection != null) {
+            kwargs.put("collection", collection);
+        }
+
+        return (List<String>) adapter.callMethod(vectorStore, "add", kwargs);
+    }
+
+    @Override
+    public VectorStoreQueryResult query(VectorStoreQuery query) {
+        Object pythonQuery = adapter.toPythonVectorStoreQuery(query);
+
+        PyObject pythonResult = (PyObject) vectorStore.invokeMethod("query", 
pythonQuery);
+
+        return adapter.fromPythonVectorStoreQueryResult(pythonResult);
+    }
+
+    @Override
+    public long size(@Nullable String collection) throws Exception {
+        return (long) vectorStore.invokeMethod("size", collection);
+    }
+
+    @Override
+    public List<Document> get(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        Map<String, Object> kwargs = new HashMap<>(extraArgs);
+        if (ids != null && !ids.isEmpty()) {
+            kwargs.put("ids", ids);
+        }
+        if (collection != null) {
+            kwargs.put("collection", collection);
+        }
+
+        Object pythonDocuments = adapter.callMethod(vectorStore, "get", 
kwargs);
+
+        return adapter.fromPythonDocuments((List<PyObject>) pythonDocuments);
+    }
+
+    @Override
+    public void delete(
+            @Nullable List<String> ids, @Nullable String collection, 
Map<String, Object> extraArgs)
+            throws IOException {
+        Map<String, Object> kwargs = new HashMap<>(extraArgs);
+        if (ids != null && !ids.isEmpty()) {
+            kwargs.put("ids", ids);
+        }
+        if (collection != null) {
+            kwargs.put("collection", collection);
+        }
+        adapter.callMethod(vectorStore, "delete", kwargs);
+    }
+
+    @Override
+    public Map<String, Object> getStoreKwargs() {
+        return Map.of();
+    }
+
+    @Override
+    protected List<Document> queryEmbedding(
+            float[] embedding, int limit, @Nullable String collection, 
Map<String, Object> args) {
+        return List.of();
+    }
+
+    @Override
+    protected List<String> addEmbedding(
+            List<Document> documents, @Nullable String collection, Map<String, 
Object> extraArgs)
+            throws IOException {
+        return List.of();
+    }
+
+    @Override
+    public Object getPythonResource() {
+        return vectorStore;
+    }
+}
diff --git 
a/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java
 
b/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java
new file mode 100644
index 0000000..d92b716
--- /dev/null
+++ 
b/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.vectorstores.python;
+
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import pemja.core.object.PyObject;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PythonCollectionManageableVectorStoreTest {
+    @Mock private PythonResourceAdapter mockAdapter;
+
+    @Mock private PyObject mockVectorStore;
+
+    @Mock private ResourceDescriptor mockDescriptor;
+
+    @Mock private BiFunction<String, ResourceType, Resource> mockGetResource;
+
+    @Mock private PyObject mockPythonCollection;
+
+    private PythonCollectionManageableVectorStore vectorStore;
+    private AutoCloseable mocks;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        mocks = MockitoAnnotations.openMocks(this);
+        vectorStore =
+                new PythonCollectionManageableVectorStore(
+                        mockAdapter, mockVectorStore, mockDescriptor, 
mockGetResource);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (mocks != null) {
+            mocks.close();
+        }
+    }
+
+    @Test
+    void testConstructor() {
+        assertThat(vectorStore).isNotNull();
+        assertThat(vectorStore.getPythonResource()).isEqualTo(mockVectorStore);
+    }
+
+    @Test
+    void testGetPythonResourceWithNullVectorStore() {
+        PythonCollectionManageableVectorStore storeWithNull =
+                new PythonCollectionManageableVectorStore(
+                        mockAdapter, null, mockDescriptor, mockGetResource);
+
+        Object result = storeWithNull.getPythonResource();
+
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void testGetOrCreateCollectionWithMetadata() throws Exception {
+        String collectionName = "test_collection";
+        Map<String, Object> metadata = new HashMap<>();
+        metadata.put("key1", "value1");
+        metadata.put("key2", "value2");
+
+        CollectionManageableVectorStore.Collection expectedCollection =
+                new CollectionManageableVectorStore.Collection(collectionName, 
metadata);
+
+        when(mockAdapter.callMethod(
+                        eq(mockVectorStore), eq("get_or_create_collection"), 
any(Map.class)))
+                .thenReturn(mockPythonCollection);
+        
when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection);
+
+        CollectionManageableVectorStore.Collection result =
+                vectorStore.getOrCreateCollection(collectionName, metadata);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getName()).isEqualTo(collectionName);
+        assertThat(result.getMetadata()).isEqualTo(metadata);
+
+        verify(mockAdapter)
+                .callMethod(
+                        eq(mockVectorStore),
+                        eq("get_or_create_collection"),
+                        argThat(
+                                kwargs -> {
+                                    assertThat(kwargs).containsKey("name");
+                                    assertThat(kwargs).containsKey("metadata");
+                                    
assertThat(kwargs.get("name")).isEqualTo(collectionName);
+                                    
assertThat(kwargs.get("metadata")).isEqualTo(metadata);
+                                    return true;
+                                }));
+    }
+
+    @Test
+    void testGetOrCreateCollectionWithEmptyMetadata() throws Exception {
+        String collectionName = "test_collection";
+        Map<String, Object> metadata = new HashMap<>();
+
+        CollectionManageableVectorStore.Collection expectedCollection =
+                new CollectionManageableVectorStore.Collection(collectionName, 
metadata);
+
+        when(mockAdapter.callMethod(
+                        eq(mockVectorStore), eq("get_or_create_collection"), 
any(Map.class)))
+                .thenReturn(mockPythonCollection);
+        
when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection);
+
+        CollectionManageableVectorStore.Collection result =
+                vectorStore.getOrCreateCollection(collectionName, metadata);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getName()).isEqualTo(collectionName);
+
+        verify(mockAdapter)
+                .callMethod(
+                        eq(mockVectorStore),
+                        eq("get_or_create_collection"),
+                        argThat(
+                                kwargs -> {
+                                    assertThat(kwargs).containsKey("name");
+                                    
assertThat(kwargs).doesNotContainKey("metadata");
+                                    return true;
+                                }));
+    }
+
+    @Test
+    void testGetOrCreateCollectionWithNullMetadata() throws Exception {
+        String collectionName = "test_collection";
+
+        CollectionManageableVectorStore.Collection expectedCollection =
+                new CollectionManageableVectorStore.Collection(collectionName, 
null);
+
+        when(mockAdapter.callMethod(
+                        eq(mockVectorStore), eq("get_or_create_collection"), 
any(Map.class)))
+                .thenReturn(mockPythonCollection);
+        
when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection);
+
+        CollectionManageableVectorStore.Collection result =
+                vectorStore.getOrCreateCollection(collectionName, null);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getName()).isEqualTo(collectionName);
+
+        verify(mockAdapter)
+                .callMethod(
+                        eq(mockVectorStore),
+                        eq("get_or_create_collection"),
+                        argThat(
+                                kwargs -> {
+                                    assertThat(kwargs).containsKey("name");
+                                    
assertThat(kwargs).doesNotContainKey("metadata");
+                                    return true;
+                                }));
+    }
+
+    @Test
+    void testGetCollection() throws Exception {
+        String collectionName = "existing_collection";
+        Map<String, Object> metadata = Map.of("type", "test");
+
+        CollectionManageableVectorStore.Collection expectedCollection =
+                new CollectionManageableVectorStore.Collection(collectionName, 
metadata);
+
+        when(mockVectorStore.invokeMethod("get_collection", collectionName))
+                .thenReturn(mockPythonCollection);
+        
when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection);
+
+        CollectionManageableVectorStore.Collection result =
+                vectorStore.getCollection(collectionName);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getName()).isEqualTo(collectionName);
+        assertThat(result.getMetadata()).isEqualTo(metadata);
+
+        verify(mockVectorStore).invokeMethod("get_collection", collectionName);
+        verify(mockAdapter).fromPythonCollection(mockPythonCollection);
+    }
+
+    @Test
+    void testDeleteCollection() throws Exception {
+        String collectionName = "collection_to_delete";
+        Map<String, Object> metadata = Map.of("status", "deleted");
+
+        CollectionManageableVectorStore.Collection expectedCollection =
+                new CollectionManageableVectorStore.Collection(collectionName, 
metadata);
+
+        when(mockVectorStore.invokeMethod("delete_collection", collectionName))
+                .thenReturn(mockPythonCollection);
+        
when(mockAdapter.fromPythonCollection(mockPythonCollection)).thenReturn(expectedCollection);
+
+        CollectionManageableVectorStore.Collection result =
+                vectorStore.deleteCollection(collectionName);
+
+        assertThat(result).isNotNull();
+        assertThat(result.getName()).isEqualTo(collectionName);
+
+        verify(mockVectorStore).invokeMethod("delete_collection", 
collectionName);
+        verify(mockAdapter).fromPythonCollection(mockPythonCollection);
+    }
+
+    @Test
+    void testAddDocuments() throws Exception {
+        List<Document> documents =
+                Arrays.asList(
+                        new Document("content1", Map.of("key", "value1"), 
"doc1"),
+                        new Document("content2", Map.of("key", "value2"), 
"doc2"));
+        String collection = "test_collection";
+        Map<String, Object> extraArgs = Map.of("batch_size", 10);
+
+        List<String> expectedIds = Arrays.asList("doc1", "doc2");
+
+        when(mockAdapter.toPythonDocuments(documents)).thenReturn(new 
Object());
+        when(mockAdapter.callMethod(eq(mockVectorStore), eq("add"), 
any(Map.class)))
+                .thenReturn(expectedIds);
+
+        List<String> result = vectorStore.add(documents, collection, 
extraArgs);
+
+        assertThat(result).isNotNull();
+        assertThat(result).hasSize(2);
+        assertThat(result).containsExactly("doc1", "doc2");
+
+        verify(mockAdapter).toPythonDocuments(documents);
+        verify(mockAdapter)
+                .callMethod(
+                        eq(mockVectorStore),
+                        eq("add"),
+                        argThat(
+                                kwargs -> {
+                                    
assertThat(kwargs).containsKey("documents");
+                                    
assertThat(kwargs).containsKey("collection");
+                                    
assertThat(kwargs).containsKey("batch_size");
+                                    return true;
+                                }));
+    }
+
+    @Test
+    void testGetDocuments() throws Exception {
+        List<String> ids = Arrays.asList("doc1", "doc2");
+        String collection = "test_collection";
+        Map<String, Object> extraArgs = new HashMap<>();
+
+        List<Document> expectedDocuments =
+                Arrays.asList(
+                        new Document("content1", Map.of(), "doc1"),
+                        new Document("content2", Map.of(), "doc2"));
+
+        when(mockAdapter.callMethod(eq(mockVectorStore), eq("get"), 
any(Map.class)))
+                .thenReturn(Arrays.asList(mockPythonCollection, 
mockPythonCollection));
+        
when(mockAdapter.fromPythonDocuments(any())).thenReturn(expectedDocuments);
+
+        List<Document> result = vectorStore.get(ids, collection, extraArgs);
+
+        assertThat(result).isNotNull();
+        assertThat(result).hasSize(2);
+
+        verify(mockAdapter)
+                .callMethod(
+                        eq(mockVectorStore),
+                        eq("get"),
+                        argThat(
+                                kwargs -> {
+                                    assertThat(kwargs).containsKey("ids");
+                                    
assertThat(kwargs).containsKey("collection");
+                                    return true;
+                                }));
+    }
+
+    @Test
+    void testDeleteDocuments() throws Exception {
+        List<String> ids = Arrays.asList("doc1", "doc2");
+        String collection = "test_collection";
+        Map<String, Object> extraArgs = new HashMap<>();
+
+        when(mockAdapter.callMethod(eq(mockVectorStore), eq("delete"), 
any(Map.class)))
+                .thenReturn(null);
+
+        vectorStore.delete(ids, collection, extraArgs);
+
+        verify(mockAdapter)
+                .callMethod(
+                        eq(mockVectorStore),
+                        eq("delete"),
+                        argThat(
+                                kwargs -> {
+                                    assertThat(kwargs).containsKey("ids");
+                                    
assertThat(kwargs).containsKey("collection");
+                                    return true;
+                                }));
+    }
+
+    @Test
+    void testSize() throws Exception {
+        String collection = "test_collection";
+        long expectedSize = 100L;
+
+        when(mockVectorStore.invokeMethod("size", 
collection)).thenReturn(expectedSize);
+
+        long result = vectorStore.size(collection);
+
+        assertThat(result).isEqualTo(expectedSize);
+        verify(mockVectorStore).invokeMethod("size", collection);
+    }
+
+    @Test
+    void testInheritanceFromPythonVectorStore() {
+        assertThat(vectorStore).isInstanceOf(PythonVectorStore.class);
+    }
+
+    @Test
+    void testImplementsCollectionManageableVectorStore() {
+        
assertThat(vectorStore).isInstanceOf(CollectionManageableVectorStore.class);
+    }
+
+    @Test
+    void testImplementsPythonResourceWrapper() {
+        assertThat(vectorStore)
+                .isInstanceOf(
+                        
org.apache.flink.agents.api.resource.python.PythonResourceWrapper.class);
+    }
+}
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
index f06b302..b11009c 100644
--- a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/pom.xml
@@ -30,6 +30,11 @@
             
<artifactId>flink-agents-integrations-chat-models-ollama</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-agents-integrations-embedding-models-ollama</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-streaming-java</artifactId>
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java
 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java
new file mode 100644
index 0000000..6368bd1
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageAgent.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.resource.test;
+
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.EmbeddingModelConnection;
+import org.apache.flink.agents.api.annotation.EmbeddingModelSetup;
+import org.apache.flink.agents.api.annotation.VectorStore;
+import org.apache.flink.agents.api.context.MemoryObject;
+import org.apache.flink.agents.api.context.RunnerContext;
+import 
org.apache.flink.agents.api.embedding.model.python.PythonEmbeddingModelConnection;
+import 
org.apache.flink.agents.api.embedding.model.python.PythonEmbeddingModelSetup;
+import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import 
org.apache.flink.agents.api.vectorstores.python.PythonCollectionManageableVectorStore;
+import 
org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelConnection;
+import 
org.apache.flink.agents.integrations.embeddingmodels.ollama.OllamaEmbeddingModelSetup;
+import org.junit.jupiter.api.Assertions;
+import pemja.core.PythonException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration test agent for verifying vector store functionality with Python 
vector store
+ * implementation.
+ *
+ * <p>This test agent validates: - Python vector store resource registration 
and creation - Python
+ * embedding model integration with vector store - Collection management 
operations (create, get,
+ * delete) - Document CRUD operations (add, get, delete, size) - Context 
retrieval and similarity
+ * search - Cross-language resource dependency (vector store depends on 
embedding model)
+ *
+ * <p>Used for e2e testing of the vector store subsystem with cross-language 
support.
+ */
+public class VectorStoreCrossLanguageAgent extends Agent {
+    public static final String OLLAMA_MODEL = "nomic-embed-text";
+    public static final String TEST_COLLECTION = "test_collection";
+
+    @EmbeddingModelConnection
+    public static ResourceDescriptor embeddingConnection() {
+        if (System.getProperty("EMBEDDING_TYPE", "PYTHON").equals("PYTHON")) {
+            return ResourceDescriptor.Builder.newBuilder(
+                            PythonEmbeddingModelConnection.class.getName())
+                    .addInitialArgument(
+                            "module",
+                            
"flink_agents.integrations.embedding_models.local.ollama_embedding_model")
+                    .addInitialArgument("clazz", 
"OllamaEmbeddingModelConnection")
+                    .build();
+        } else {
+            return ResourceDescriptor.Builder.newBuilder(
+                            OllamaEmbeddingModelConnection.class.getName())
+                    .addInitialArgument("host", "http://localhost:11434";)
+                    .addInitialArgument("timeout", 60)
+                    .build();
+        }
+    }
+
+    @EmbeddingModelSetup
+    public static ResourceDescriptor embeddingModel() {
+        if (System.getProperty("EMBEDDING_TYPE", "PYTHON").equals("PYTHON")) {
+            return 
ResourceDescriptor.Builder.newBuilder(PythonEmbeddingModelSetup.class.getName())
+                    .addInitialArgument(
+                            "module",
+                            
"flink_agents.integrations.embedding_models.local.ollama_embedding_model")
+                    .addInitialArgument("clazz", "OllamaEmbeddingModelSetup")
+                    .addInitialArgument("connection", "embeddingConnection")
+                    .addInitialArgument("model", OLLAMA_MODEL)
+                    .build();
+        } else {
+            return 
ResourceDescriptor.Builder.newBuilder(OllamaEmbeddingModelSetup.class.getName())
+                    .addInitialArgument("connection", "embeddingConnection")
+                    .addInitialArgument("model", OLLAMA_MODEL)
+                    .build();
+        }
+    }
+
+    @VectorStore
+    public static ResourceDescriptor vectorStore() {
+        return ResourceDescriptor.Builder.newBuilder(
+                        PythonCollectionManageableVectorStore.class.getName())
+                .addInitialArgument(
+                        "module",
+                        
"flink_agents.integrations.vector_stores.chroma.chroma_vector_store")
+                .addInitialArgument("clazz", "ChromaVectorStore")
+                .addInitialArgument("embedding_model", "embeddingModel")
+                .build();
+    }
+
+    @Action(listenEvents = InputEvent.class)
+    public static void inputEvent(InputEvent event, RunnerContext ctx) throws 
Exception {
+        final String input = (String) event.getInput();
+
+        MemoryObject isInitialized = 
ctx.getShortTermMemory().get("is_initialized");
+        if (isInitialized == null) {
+            PythonCollectionManageableVectorStore vectorStore =
+                    (PythonCollectionManageableVectorStore)
+                            ctx.getResource("vectorStore", 
ResourceType.VECTOR_STORE);
+
+            // Initialize vector store
+            vectorStore.getOrCreateCollection(
+                    TEST_COLLECTION, Map.of("key1", "value1", "key2", 
"value2"));
+
+            CollectionManageableVectorStore.Collection collection =
+                    vectorStore.getCollection(TEST_COLLECTION);
+            Assertions.assertNotEquals(collection, null, "Vector store 
collection is null");
+            Assertions.assertEquals(
+                    TEST_COLLECTION,
+                    collection.getName(),
+                    "Vector store collection name is not test_collection");
+            Assertions.assertEquals(
+                    Map.of("key1", "value1", "key2", "value2"),
+                    collection.getMetadata(),
+                    "Vector store collection metadata is not correct");
+
+            System.out.println("[TEST] Vector store Collection Management 
PASSED");
+
+            vectorStore.deleteCollection(TEST_COLLECTION);
+            Assertions.assertThrows(
+                    PythonException.class, () -> 
vectorStore.getCollection(TEST_COLLECTION));
+
+            // Initialize collection
+            vectorStore.add(
+                    List.of(
+                            new Document(
+                                    "Apache Flink Agents is an Agentic AI 
framework based on Apache Flink.",
+                                    Map.of("category", "ai-agent", "source", 
"test"),
+                                    "doc1"),
+                            new Document(
+                                    "ChromaDB is a vector database for AI 
applications",
+                                    Map.of("category", "database", "source", 
"test"),
+                                    "doc2"),
+                            new Document(
+                                    "This is a test document used to verify 
the delete functionality.",
+                                    Map.of("category", "utility", "source", 
"test"),
+                                    "doc3")),
+                    null,
+                    Map.of());
+
+            // Test size
+            Assertions.assertEquals(3, vectorStore.size(null), "Vector store 
size is not 3");
+
+            // Test delete
+            vectorStore.delete(List.of("doc3"), null, Map.of());
+            Assertions.assertEquals(
+                    2, vectorStore.size(null), "Vector store size is not 2, 
doc3 was not deleted");
+
+            // Test get
+            Document doc = vectorStore.get(List.of("doc2"), null, 
Map.of()).get(0);
+            Assertions.assertEquals(
+                    "ChromaDB is a vector database for AI applications", 
doc.getContent());
+            Assertions.assertEquals(
+                    Map.of("category", "database", "source", "test"), 
doc.getMetadata());
+
+            System.out.println("[TEST] Vector store Document Management 
PASSED");
+
+            ctx.getShortTermMemory().set("is_initialized", true);
+        }
+
+        ctx.sendEvent(new ContextRetrievalRequestEvent(input, "vectorStore"));
+    }
+
+    @Action(listenEvents = ContextRetrievalResponseEvent.class)
+    public static void contextRetrievalResponseEvent(
+            ContextRetrievalResponseEvent event, RunnerContext ctx) {
+        final List<Document> documents = event.getDocuments();
+
+        Map<String, Object> result = new HashMap<>();
+        try {
+            // Basic validations similar in spirit to EmbeddingIntegrationAgent
+            if (documents == null) {
+                throw new AssertionError("Vector store returned null documents 
list");
+            }
+
+            if (documents.isEmpty()) {
+                throw new AssertionError("Vector store returned empty 
documents list");
+            }
+
+            int idx = 0;
+            for (Document doc : documents) {
+                if (doc == null) {
+                    throw new AssertionError("Document entry is null");
+                }
+
+                final String content = doc.getContent();
+                if (content == null || content.trim().isEmpty()) {
+                    throw new AssertionError(String.format("Document[%d] 
content is empty", idx));
+                }
+
+                // ID can be optional, but when present it must be non-empty
+                if (doc.getId() != null && doc.getId().trim().isEmpty()) {
+                    throw new AssertionError(String.format("Document[%d] id is 
empty string", idx));
+                }
+                idx++;
+            }
+
+            result.put("test_status", "PASSED");
+            result.put("retrieved_count", documents.size());
+            // Include preview of first doc
+            Document first = documents.get(0);
+            result.put("first_doc_id", first.getId());
+            result.put(
+                    "first_doc_preview",
+                    first.getContent().substring(0, Math.min(50, 
first.getContent().length())));
+
+            ctx.sendEvent(new OutputEvent(result));
+            System.out.printf("[TEST] Vector store retrieval PASSED, 
count=%d%n", documents.size());
+        } catch (Exception e) {
+            result.put("test_status", "FAILED");
+            result.put("error", e.getMessage());
+            ctx.sendEvent(new OutputEvent(result));
+            System.err.printf("[TEST] Vector store retrieval FAILED: %s%n", 
e.getMessage());
+            throw e;
+        }
+    }
+}
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageTest.java
 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageTest.java
new file mode 100644
index 0000000..0d53d49
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-resource-cross-language/src/test/java/org/apache/flink/agents/resource/test/VectorStoreCrossLanguageTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.resource.test;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.resource.test.OllamaPreparationUtils.pullModel;
+import static 
org.apache.flink.agents.resource.test.VectorStoreCrossLanguageAgent.OLLAMA_MODEL;
+
+public class VectorStoreCrossLanguageTest {
+
+    private final boolean ollamaReady;
+
+    public VectorStoreCrossLanguageTest() throws IOException {
+        ollamaReady = pullModel(OLLAMA_MODEL);
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"JAVA", "PYTHON"})
+    public void testVectorStoreIntegration(String embeddingType) throws 
Exception {
+        System.setProperty("EMBEDDING_TYPE", embeddingType);
+        Assumptions.assumeTrue(ollamaReady, "Ollama Server information is not 
provided");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        final DataStreamSource<String> inputStream = env.fromData("What is 
Apache Flink");
+
+        final AgentsExecutionEnvironment agentEnv =
+                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+        final DataStream<Object> outputStream =
+                agentEnv.fromDataStream(
+                                inputStream, (KeySelector<String, String>) 
value -> "orderKey")
+                        .apply(new VectorStoreCrossLanguageAgent())
+                        .toDataStream();
+
+        final CloseableIterator<Object> results = outputStream.collectAsync();
+
+        agentEnv.execute();
+
+        checkResult(results);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void checkResult(CloseableIterator<Object> results) {
+        Assertions.assertTrue(
+                results.hasNext(), "No output received from 
VectorStoreIntegrationAgent");
+
+        Object obj = results.next();
+        Assertions.assertInstanceOf(Map.class, obj, "Output must be a Map");
+
+        java.util.Map<String, Object> res = (java.util.Map<String, Object>) 
obj;
+        Assertions.assertEquals("PASSED", res.get("test_status"));
+
+        Object count = res.get("retrieved_count");
+        Assertions.assertNotNull(count, "retrieved_count must exist");
+        if (count instanceof Number) {
+            Assertions.assertTrue(((Number) count).intValue() >= 1, 
"retrieved_count must be >= 1");
+        }
+
+        Object preview = res.get("first_doc_preview");
+        Assertions.assertTrue(
+                preview instanceof String && !((String) 
preview).trim().isEmpty(),
+                "first_doc_preview must be a non-empty string");
+
+        Object firstId = res.get("first_doc_id");
+        if (firstId != null) {
+            Assertions.assertTrue(
+                    firstId instanceof String && !((String) 
firstId).trim().isEmpty(),
+                    "first_doc_id when present must be a non-empty string");
+        }
+    }
+}
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
index 9f77676..5e9a0cc 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/resourceprovider/PythonResourceProvider.java
@@ -26,6 +26,7 @@ import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import 
org.apache.flink.agents.api.vectorstores.python.PythonCollectionManageableVectorStore;
 import pemja.core.object.PyObject;
 
 import java.lang.reflect.Constructor;
@@ -50,7 +51,8 @@ public class PythonResourceProvider extends ResourceProvider {
                     ResourceType.CHAT_MODEL, PythonChatModelSetup.class,
                     ResourceType.CHAT_MODEL_CONNECTION, 
PythonChatModelConnection.class,
                     ResourceType.EMBEDDING_MODEL, 
PythonEmbeddingModelSetup.class,
-                    ResourceType.EMBEDDING_MODEL_CONNECTION, 
PythonEmbeddingModelConnection.class);
+                    ResourceType.EMBEDDING_MODEL_CONNECTION, 
PythonEmbeddingModelConnection.class,
+                    ResourceType.VECTOR_STORE, 
PythonCollectionManageableVectorStore.class);
 
     protected PythonResourceAdapter pythonResourceAdapter;
 
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
index ae95f40..7a9ad41 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
@@ -33,6 +33,10 @@ import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.resource.SerializableResource;
 import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
 import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
 import org.apache.flink.agents.plan.actions.Action;
 import 
org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider;
 import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
@@ -198,6 +202,33 @@ public class AgentPlanTest {
             return null;
         }
 
+        @Override
+        public Object toPythonDocuments(List<Document> documents) {
+            return null;
+        }
+
+        @Override
+        public List<Document> fromPythonDocuments(List<PyObject> 
pythonDocuments) {
+            return List.of();
+        }
+
+        @Override
+        public Object toPythonVectorStoreQuery(VectorStoreQuery query) {
+            return null;
+        }
+
+        @Override
+        public VectorStoreQueryResult fromPythonVectorStoreQueryResult(
+                PyObject pythonVectorStoreQueryResult) {
+            return null;
+        }
+
+        @Override
+        public CollectionManageableVectorStore.Collection fromPythonCollection(
+                PyObject pythonCollection) {
+            return null;
+        }
+
         @Override
         public Object 
convertToPythonTool(org.apache.flink.agents.api.tools.Tool tool) {
             return null;
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/python_java_utils.py
index 6a3a84e..de223dd 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -26,6 +26,11 @@ from flink_agents.api.events.event import InputEvent
 from flink_agents.api.resource import Resource, ResourceType, 
get_resource_class
 from flink_agents.api.tools.tool import ToolMetadata
 from flink_agents.api.tools.utils import create_model_from_java_tool_schema_str
+from flink_agents.api.vector_stores.vector_store import (
+    Document,
+    VectorStoreQuery,
+    VectorStoreQueryMode,
+)
 from flink_agents.plan.resource_provider import JAVA_RESOURCE_MAPPING
 from flink_agents.runtime.java.java_resource_wrapper import (
     JavaGetResourceWrapper,
@@ -183,6 +188,36 @@ def update_java_chat_message(chat_message: ChatMessage, 
j_chat_message: Any) ->
 
     return chat_message.role.value
 
+def from_java_document(j_document: Any) -> Document:
+    """Convert a Java documents to a Python document."""
+    document = Document(
+        content=j_document.getContent(),
+        id=j_document.getId(),
+        metadata=j_document.getMetadata(),
+    )
+    if j_document.getEmbedding():
+        document.embedding = list(j_document.getEmbedding())
+    return document
+
+def update_java_document(document: Document, j_document: Any) -> None:
+    """Update a Java document using Python document."""
+    j_document.setContent(document.content)
+    j_document.setId(document.id)
+    j_document.setMetadata(document.metadata)
+    if document.embedding:
+        j_document.setEmbedding(tuple(document.embedding))
+
+
+def from_java_vector_store_query(j_query: Any) -> VectorStoreQuery:
+    """Convert a Java vector store query to a Python query."""
+    return VectorStoreQuery(
+        mode=VectorStoreQueryMode(j_query.getMode().getValue()),
+        query_text=j_query.getQueryText(),
+        limit=j_query.getLimit(),
+        collection_name=j_query.getCollection(),
+        extra_args=j_query.getExtraArgs()
+    )
+
 def call_method(obj: Any, method_name: str, kwargs: Dict[str, Any]) -> Any:
     """Calls a method on `obj` by name and passes in positional and keyword 
arguments.
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
index b167e19..6c2c875 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java
@@ -25,10 +25,16 @@ import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
 import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
 import org.apache.flink.agents.api.tools.Tool;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
 import pemja.core.PythonInterpreter;
 import pemja.core.object.PyObject;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
 
@@ -60,6 +66,13 @@ public class PythonResourceAdapterImpl implements 
PythonResourceAdapter {
 
     static final String TO_JAVA_CHAT_MESSAGE = PYTHON_MODULE_PREFIX + 
"update_java_chat_message";
 
+    static final String FROM_JAVA_DOCUMENT = PYTHON_MODULE_PREFIX + 
"from_java_document";
+
+    static final String UPDATE_JAVA_DOCUMENT = PYTHON_MODULE_PREFIX + 
"update_java_document";
+
+    static final String FROM_JAVA_VECTOR_STORE_QUERY =
+            PYTHON_MODULE_PREFIX + "from_java_vector_store_query";
+
     private final BiFunction<String, ResourceType, Resource> getResource;
     private final PythonInterpreter interpreter;
     private final JavaResourceAdapter javaResourceAdapter;
@@ -125,6 +138,50 @@ public class PythonResourceAdapterImpl implements 
PythonResourceAdapter {
         return chatMessage;
     }
 
+    @Override
+    public Object toPythonDocuments(List<Document> documents) {
+        List<Object> pythonDocuments = new ArrayList<>();
+        for (Document document : documents) {
+            pythonDocuments.add(interpreter.invoke(FROM_JAVA_DOCUMENT, 
document));
+        }
+        return pythonDocuments;
+    }
+
+    @Override
+    public List<Document> fromPythonDocuments(List<PyObject> pythonDocuments) {
+        List<Document> documents = new ArrayList<>();
+        for (PyObject pythonDocument : pythonDocuments) {
+            Document document =
+                    new Document(
+                            pythonDocument.getAttr("content").toString(),
+                            (Map<String, Object>) 
pythonDocument.getAttr("metadata", Map.class),
+                            pythonDocument.getAttr("id").toString());
+            documents.add(document);
+        }
+        return documents;
+    }
+
+    @Override
+    public Object toPythonVectorStoreQuery(VectorStoreQuery query) {
+        return interpreter.invoke(FROM_JAVA_VECTOR_STORE_QUERY, query);
+    }
+
+    @Override
+    public VectorStoreQueryResult fromPythonVectorStoreQueryResult(
+            PyObject pythonVectorStoreQueryResult) {
+        List<PyObject> pythonDocuments =
+                (List<PyObject>) 
pythonVectorStoreQueryResult.getAttr("documents", List.class);
+        return new 
VectorStoreQueryResult(fromPythonDocuments(pythonDocuments));
+    }
+
+    @Override
+    public CollectionManageableVectorStore.Collection fromPythonCollection(
+            PyObject pythonCollection) {
+        return new CollectionManageableVectorStore.Collection(
+                pythonCollection.getAttr("name").toString(),
+                (Map<String, Object>) pythonCollection.getAttr("metadata", 
Map.class));
+    }
+
     @Override
     public Object convertToPythonTool(Tool tool) {
         return interpreter.invoke(FROM_JAVA_TOOL, tool);

Reply via email to