This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 1c829cb7 [dependency] Bump flink to 1.20.5 and 2.1.3 (#842)
1c829cb7 is described below
commit 1c829cb7be9c61a2e904a3600b3ebe099aa6392c
Author: Wenjin Xie <[email protected]>
AuthorDate: Sat Jun 13 22:09:45 2026 +0800
[dependency] Bump flink to 1.20.5 and 2.1.3 (#842)
* [fix] Keep numpy off the async pool for cross-language RAG queries
A Python vector store's query path runs numpy (e.g. chroma's embedding
normalization). numpy releases and re-acquires the GIL during the
conversion,
which deadlocks on an async pemja worker thread since pemja keeps a single
PyThreadState. Split the async RAG query so only the numpy normalization
runs
on the operator thread: embed and query stay async, normalize is sync.
- PythonVectorStore: resolve the embedding model in open(); add embedQuery /
normalizeEmbedding / queryNormalized hooks; forward pre-computed vectors.
- BaseVectorStore: expose getEmbeddingModel to subclasses.
- ContextRetrievalAction: async embed -> sync normalize -> async query.
- ChromaVectorStore: add _normalize_embeddings; query accepts
pre-normalized.
Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
* [dependency] Bump flink to 1.20.5 and 2.1.3
---------
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
.../agents/api/vectorstores/BaseVectorStore.java | 2 +-
.../api/vectorstores/python/PythonVectorStore.java | 139 +++++++++++++--------
.../PythonCollectionManageableVectorStoreTest.java | 26 ++--
dist/pom.xml | 4 +-
.../pom.xml | 8 +-
.../plan/actions/ContextRetrievalAction.java | 106 +++++++++++++---
pom.xml | 4 +-
.../flink_agents/api/vector_stores/vector_store.py | 10 ++
.../vector_stores/chroma/chroma_vector_store.py | 18 +++
9 files changed, 224 insertions(+), 93 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
index 442e705e..d0fc4ce7 100644
---
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
+++
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java
@@ -339,7 +339,7 @@ public abstract class BaseVectorStore extends Resource {
}
}
- private BaseEmbeddingModelSetup getEmbeddingModel() {
+ protected BaseEmbeddingModelSetup getEmbeddingModel() {
if (embeddingModel == null) {
throw new IllegalStateException(
"No embedding model configured on this vector store. "
diff --git
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java
index 6b2d5d71..69025cc1 100644
---
a/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java
+++
b/api/src/main/java/org/apache/flink/agents/api/vectorstores/python/PythonVectorStore.java
@@ -24,13 +24,12 @@ import
org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
import org.apache.flink.agents.api.vectorstores.Document;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
import pemja.core.object.PyObject;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -45,10 +44,11 @@ import java.util.Map;
* <p>This class serves as a connection layer between Java and Python vector
store environments,
* enabling seamless integration of Python-based vector stores within Java
applications.
*
- * <p>The {@code *Embedding} hooks ({@link #queryEmbedding}, {@link
#addEmbedding}, {@link
- * #updateEmbedding}) are no-ops here: this bridge forwards each public method
directly to its
- * Python counterpart, which already handles auto-embedding internally, so the
Java auto-embed path
- * in {@link BaseVectorStore} is not used.
+ * <p>Embedding is generated on the Java side via {@link BaseVectorStore}'s
public add/update/query;
+ * the {@code *Embedding} hooks then forward the pre-computed vectors to the
Python {@code
+ * _add_embedding}/{@code _update_embedding}/{@code _query_embedding}. This
keeps each store
+ * operation a single Java->Python crossing, avoiding a Python->Java re-entry
that deadlocks when
+ * run on the async pool thread.
*/
public class PythonVectorStore extends BaseVectorStore implements
PythonResourceWrapper {
protected final PyObject vectorStore;
@@ -74,52 +74,14 @@ public class PythonVectorStore extends BaseVectorStore
implements PythonResource
}
@Override
- public void open() {
+ public void open() throws Exception {
+ // Resolve the Java-side embedding model so embeddings are generated
on the mailbox thread
+ // (single Java->Python crossing per op). Without this, add/query
would re-embed inside
+ // Python and re-enter Java, which deadlocks when run on the async
pool thread.
+ super.open();
adapter.callMethod(vectorStore, "open", Collections.emptyMap());
}
- @Override
- @SuppressWarnings("unchecked")
- public List<String> add(
- List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
- throws IOException {
- Object pythonDocuments = adapter.toPythonDocuments(documents);
-
- Map<String, Object> kwargs = new HashMap<>(extraArgs);
- kwargs.put("documents", pythonDocuments);
-
- if (collection != null) {
- kwargs.put("collection_name", collection);
- }
-
- return (List<String>) adapter.callMethod(vectorStore, "add", kwargs);
- }
-
- @Override
- public void update(
- List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
- throws IOException {
- Object pythonDocuments = adapter.toPythonDocuments(documents);
-
- Map<String, Object> kwargs = new HashMap<>(extraArgs);
- kwargs.put("documents", pythonDocuments);
-
- if (collection != null) {
- kwargs.put("collection_name", collection);
- }
-
- adapter.callMethod(vectorStore, "update", kwargs);
- }
-
- @Override
- public VectorStoreQueryResult query(VectorStoreQuery query) {
- Object pythonQuery = adapter.toPythonVectorStoreQuery(query);
-
- PyObject pythonResult = (PyObject) vectorStore.invokeMethod("query",
pythonQuery);
-
- return adapter.fromPythonVectorStoreQueryResult(pythonResult);
- }
-
@Override
@SuppressWarnings("unchecked")
public List<Document> get(
@@ -170,30 +132,101 @@ public class PythonVectorStore extends BaseVectorStore
implements PythonResource
@Override
public Map<String, Object> getStoreKwargs() {
- return Map.of();
+ return new HashMap<>();
}
@Override
+ @SuppressWarnings("unchecked")
public List<Document> queryEmbedding(
float[] embedding,
int limit,
@Nullable String collection,
@Nullable Map<String, Object> filters,
Map<String, Object> args) {
- return List.of();
+ Map<String, Object> kwargs = new HashMap<>(args);
+ // pemja maps float[] to a Python tuple, which Chroma rejects; pass a
list instead.
+ List<Float> embeddingList = new ArrayList<>(embedding.length);
+ for (float v : embedding) {
+ embeddingList.add(v);
+ }
+ kwargs.put("embedding", embeddingList);
+ kwargs.put("limit", limit);
+ if (collection != null) {
+ kwargs.put("collection_name", collection);
+ }
+ if (filters != null) {
+ kwargs.put("filters", filters);
+ }
+ Object pythonDocuments = adapter.callMethod(vectorStore,
"_query_embedding", kwargs);
+ return adapter.fromPythonDocuments((List<PyObject>) pythonDocuments);
+ }
+
+ /** Embed query text via the configured model (no numpy, so it stays on
the async pool). */
+ public float[] embedQuery(String text) {
+ return getEmbeddingModel().embed(text);
+ }
+
+ /**
+ * Convert the raw embedding to the Python store's native vector form.
This runs the numpy
+ * conversion on the mailbox thread: numpy releases/re-acquires the GIL
during the copy, and
+ * pemja keeps a single PyThreadState, so doing it on an async worker
thread can stall the
+ * interpreter (observed as a hang in CI; benign with spare cores
locally). The returned Python
+ * object is forwarded back into {@link #queryNormalized}. See
+ * https://github.com/apache/flink-agents/issues/844.
+ */
+ public Object normalizeEmbedding(float[] embedding) {
+ List<Float> embeddingList = new ArrayList<>(embedding.length);
+ for (float v : embedding) {
+ embeddingList.add(v);
+ }
+ Map<String, Object> kwargs = new HashMap<>();
+ kwargs.put("embeddings", embeddingList);
+ return adapter.callMethod(vectorStore, "_normalize_embeddings",
kwargs);
+ }
+
+ /** Query with a pre-normalized embedding; numpy-free, so it stays on the
async pool. */
+ @SuppressWarnings("unchecked")
+ public List<Document> queryNormalized(
+ Object normalizedEmbedding,
+ int limit,
+ @Nullable String collection,
+ @Nullable Map<String, Object> filters,
+ Map<String, Object> args) {
+ Map<String, Object> kwargs = new HashMap<>(args);
+ kwargs.put("embedding", normalizedEmbedding);
+ kwargs.put("limit", limit);
+ if (collection != null) {
+ kwargs.put("collection_name", collection);
+ }
+ if (filters != null) {
+ kwargs.put("filters", filters);
+ }
+ Object pythonDocuments = adapter.callMethod(vectorStore,
"_query_embedding", kwargs);
+ return adapter.fromPythonDocuments((List<PyObject>) pythonDocuments);
}
@Override
+ @SuppressWarnings("unchecked")
public List<String> addEmbedding(
List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs)
throws IOException {
- return List.of();
+ Map<String, Object> kwargs = new HashMap<>(extraArgs);
+ kwargs.put("documents", adapter.toPythonDocuments(documents));
+ if (collection != null) {
+ kwargs.put("collection_name", collection);
+ }
+ return (List<String>) adapter.callMethod(vectorStore,
"_add_embedding", kwargs);
}
@Override
public void updateEmbedding(
List<Document> documents, @Nullable String collection, Map<String,
Object> extraArgs) {
- // no-op; Python forwards public update() directly
+ Map<String, Object> kwargs = new HashMap<>(extraArgs);
+ kwargs.put("documents", adapter.toPythonDocuments(documents));
+ if (collection != null) {
+ kwargs.put("collection_name", collection);
+ }
+ adapter.callMethod(vectorStore, "_update_embedding", kwargs);
}
@Override
diff --git
a/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java
b/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java
index 0a5e04cb..37b27f85 100644
---
a/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java
+++
b/api/src/test/java/org/apache/flink/agents/api/vectorstores/python/PythonCollectionManageableVectorStoreTest.java
@@ -161,17 +161,19 @@ public class PythonCollectionManageableVectorStoreTest {
@Test
void testAddDocuments() throws Exception {
- List<Document> documents =
- Arrays.asList(
- new Document("content1", Map.of("key", "value1"),
"doc1"),
- new Document("content2", Map.of("key", "value2"),
"doc2"));
+ Document d1 = new Document("content1", Map.of("key", "value1"),
"doc1");
+ Document d2 = new Document("content2", Map.of("key", "value2"),
"doc2");
+ // Pre-computed embeddings so the Java auto-embed path is skipped (no
model configured).
+ d1.setEmbedding(new float[] {0.1f, 0.2f});
+ d2.setEmbedding(new float[] {0.3f, 0.4f});
+ List<Document> documents = Arrays.asList(d1, d2);
String collection = "test_collection";
Map<String, Object> extraArgs = Map.of("batch_size", 10);
List<String> expectedIds = Arrays.asList("doc1", "doc2");
when(mockAdapter.toPythonDocuments(documents)).thenReturn(new
Object());
- when(mockAdapter.callMethod(eq(mockVectorStore), eq("add"),
any(Map.class)))
+ when(mockAdapter.callMethod(eq(mockVectorStore), eq("_add_embedding"),
any(Map.class)))
.thenReturn(expectedIds);
List<String> result = vectorStore.add(documents, collection,
extraArgs);
@@ -184,7 +186,7 @@ public class PythonCollectionManageableVectorStoreTest {
verify(mockAdapter)
.callMethod(
eq(mockVectorStore),
- eq("add"),
+ eq("_add_embedding"),
argThat(
kwargs -> {
assertThat(kwargs).containsKey("documents");
@@ -196,10 +198,12 @@ public class PythonCollectionManageableVectorStoreTest {
@Test
void testUpdateDocuments() throws Exception {
- List<Document> documents =
- Arrays.asList(
- new Document("c1", Map.of("k", "v1"), "doc1"),
- new Document("c2", Map.of("k", "v2"), "doc2"));
+ Document d1 = new Document("c1", Map.of("k", "v1"), "doc1");
+ Document d2 = new Document("c2", Map.of("k", "v2"), "doc2");
+ // Pre-computed embeddings so the Java auto-embed path is skipped (no
model configured).
+ d1.setEmbedding(new float[] {0.1f, 0.2f});
+ d2.setEmbedding(new float[] {0.3f, 0.4f});
+ List<Document> documents = Arrays.asList(d1, d2);
String collection = "test_collection";
Map<String, Object> extraArgs = Map.of("batch_size", 5);
@@ -211,7 +215,7 @@ public class PythonCollectionManageableVectorStoreTest {
verify(mockAdapter)
.callMethod(
eq(mockVectorStore),
- eq("update"),
+ eq("_update_embedding"),
argThat(
kwargs -> {
assertThat(kwargs).containsKey("documents");
diff --git a/dist/pom.xml b/dist/pom.xml
index 9e0c0c6c..0a72542a 100644
--- a/dist/pom.xml
+++ b/dist/pom.xml
@@ -30,9 +30,9 @@ under the License.
<packaging>pom</packaging>
<properties>
- <flink.1.20.version>1.20.4</flink.1.20.version>
+ <flink.1.20.version>1.20.5</flink.1.20.version>
<flink.2.0.version>2.0.2</flink.2.0.version>
- <flink.2.1.version>2.1.2</flink.2.1.version>
+ <flink.2.1.version>2.1.3</flink.2.1.version>
<flink.2.2.version>2.2.1</flink.2.2.version>
</properties>
diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
index 119edaf8..bfbdf1e6 100644
--- a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
@@ -29,10 +29,10 @@ under the License.
<name>Flink Agents : E2E Tests: Integration</name>
<properties>
- <flink.1.20.version>1.20.3</flink.1.20.version>
- <flink.2.0.version>2.0.1</flink.2.0.version>
- <flink.2.1.version>2.1.1</flink.2.1.version>
- <flink.2.2.version>2.2.0</flink.2.2.version>
+ <flink.1.20.version>1.20.5</flink.1.20.version>
+ <flink.2.0.version>2.0.2</flink.2.0.version>
+ <flink.2.1.version>2.1.3</flink.2.1.version>
+ <flink.2.2.version>2.2.1</flink.2.2.version>
<flink.version>${flink.2.2.version}</flink.version>
<flink.agents.dist.artifactId>flink-agents-dist-flink-2.2</flink.agents.dist.artifactId>
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java
index 28a0853b..6b91c05c 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java
@@ -26,6 +26,7 @@ import
org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.vectorstores.BaseVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
import org.apache.flink.agents.api.vectorstores.python.PythonVectorStore;
@@ -71,26 +72,34 @@ public class ContextRetrievalAction {
contextRetrievalRequestEvent.getQuery(),
contextRetrievalRequestEvent.getMaxResults());
- DurableCallable<VectorStoreQueryResult> callable =
- new DurableCallable<VectorStoreQueryResult>() {
- @Override
- public String getId() {
- return "rag-async";
- }
-
- @Override
- public Class<VectorStoreQueryResult> getResultClass() {
- return VectorStoreQueryResult.class;
- }
-
- @Override
- public VectorStoreQueryResult call() throws Exception {
- return vectorStore.query(vectorStoreQuery);
- }
- };
-
- VectorStoreQueryResult result =
- ragAsync ? ctx.durableExecuteAsync(callable) :
ctx.durableExecute(callable);
+ final VectorStoreQueryResult result;
+ if (ragAsync && vectorStore instanceof PythonVectorStore) {
+ // A Python store's query path runs numpy, which can stall on
the async pool
+ // (pemja keeps a single PyThreadState; numpy
releasing/re-acquiring the GIL on a
+ // worker thread hangs intermittently — seen in CI, fine
locally with spare cores).
+ // Keep only that numpy step on the mailbox thread; embed and
query stay async.
+ result = queryPythonAsync((PythonVectorStore) vectorStore,
vectorStoreQuery, ctx);
+ } else {
+ DurableCallable<VectorStoreQueryResult> callable =
+ new DurableCallable<VectorStoreQueryResult>() {
+ @Override
+ public String getId() {
+ return "rag-async";
+ }
+
+ @Override
+ public Class<VectorStoreQueryResult>
getResultClass() {
+ return VectorStoreQueryResult.class;
+ }
+
+ @Override
+ public VectorStoreQueryResult call() throws
Exception {
+ return vectorStore.query(vectorStoreQuery);
+ }
+ };
+ result =
+ ragAsync ? ctx.durableExecuteAsync(callable) :
ctx.durableExecute(callable);
+ }
ctx.sendEvent(
new ContextRetrievalResponseEvent(
@@ -99,4 +108,61 @@ public class ContextRetrievalAction {
result.getDocuments()));
}
}
+
+ /**
+ * Run a Python vector-store RAG query while keeping numpy off the async
pool: embed async,
+ * normalize the embedding synchronously on the mailbox thread (numpy on a
worker thread can
+ * stall under pemja's single PyThreadState), then query async with the
pre-normalized vector.
+ * See https://github.com/apache/flink-agents/issues/844.
+ */
+ private static VectorStoreQueryResult queryPythonAsync(
+ PythonVectorStore store, VectorStoreQuery query, RunnerContext
ctx) throws Exception {
+ final float[] embedding =
+ ctx.durableExecuteAsync(
+ new DurableCallable<float[]>() {
+ @Override
+ public String getId() {
+ return "rag-embed";
+ }
+
+ @Override
+ public Class<float[]> getResultClass() {
+ return float[].class;
+ }
+
+ @Override
+ public float[] call() {
+ return store.embedQuery(query.getQueryText());
+ }
+ });
+
+ final Object normalized = store.normalizeEmbedding(embedding);
+
+ final List<Document> documents =
+ ctx.durableExecuteAsync(
+ new DurableCallable<List<Document>>() {
+ @Override
+ public String getId() {
+ return "rag-query";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class<List<Document>> getResultClass() {
+ return (Class<List<Document>>) (Class<?>)
List.class;
+ }
+
+ @Override
+ public List<Document> call() {
+ return store.queryNormalized(
+ normalized,
+ query.getLimit(),
+ query.getCollection(),
+ query.getFilters(),
+ store.getStoreKwargs());
+ }
+ });
+
+ return new VectorStoreQueryResult(documents);
+ }
}
diff --git a/pom.xml b/pom.xml
index e77b0a51..4728fa58 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,12 +41,12 @@ under the License.
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<spotless.version>2.27.1</spotless.version>
<spotless.skip>false</spotless.skip>
- <flink.version>2.2.0</flink.version>
+ <flink.version>2.2.1</flink.version>
<kafka.version>4.0.0</kafka.version>
<fluss.version>0.9.0-incubating</fluss.version>
<junit5.version>5.10.1</junit5.version>
<jackson.version>2.18.2</jackson.version>
- <pemja.version>0.5.5</pemja.version>
+ <pemja.version>0.5.7</pemja.version>
<log4j2.version>2.23.1</log4j2.version>
<slf4j.version>1.7.36</slf4j.version>
<assertj.version>3.27.7</assertj.version>
diff --git a/python/flink_agents/api/vector_stores/vector_store.py
b/python/flink_agents/api/vector_stores/vector_store.py
index d16d00c7..2cf04c49 100644
--- a/python/flink_agents/api/vector_stores/vector_store.py
+++ b/python/flink_agents/api/vector_stores/vector_store.py
@@ -397,6 +397,16 @@ class BaseVectorStore(Resource, ABC):
**kwargs: Vector store specific parameters.
"""
+ @staticmethod
+ def _normalize_embeddings(embeddings: list[float]) -> Any:
+ """Pre-process a query embedding before the search call.
+
+ Hook for backends whose query path performs CPU/numpy work that must
run
+ on the mailbox thread rather than an async cross-language worker (see
the
+ ChromaDB override). Default is identity.
+ """
+ return embeddings
+
@abstractmethod
def _query_embedding(
self,
diff --git
a/python/flink_agents/integrations/vector_stores/chroma/chroma_vector_store.py
b/python/flink_agents/integrations/vector_stores/chroma/chroma_vector_store.py
index 7028ccd0..29dacf90 100644
---
a/python/flink_agents/integrations/vector_stores/chroma/chroma_vector_store.py
+++
b/python/flink_agents/integrations/vector_stores/chroma/chroma_vector_store.py
@@ -21,6 +21,7 @@ from typing import Any, Dict, Generator, List
import chromadb
from chromadb import ClientAPI as ChromaClient
from chromadb import CloudClient
+from chromadb.api.types import normalize_embeddings
from chromadb.config import Settings
from pydantic import Field
from typing_extensions import override
@@ -306,6 +307,20 @@ class ChromaVectorStore(CollectionManageableVectorStore):
metadatas=[doc.metadata for doc in documents],
)
+ @staticmethod
+ @override
+ def _normalize_embeddings(embeddings: List[float]) -> Any:
+ """Convert the raw embedding to chroma's numpy form on the caller's
thread.
+
+ chroma's query normally runs this np.array conversion internally. numpy
+ releases and re-acquires the GIL during the copy, which can stall on an
+ async pemja worker thread (pemja keeps a single PyThreadState) — seen
as a
+ hang in CI, benign locally with spare cores. Running it here lets the
+ mailbox thread do the numpy step, so the async query sees a ready
ndarray.
+ See https://github.com/apache/flink-agents/issues/844.
+ """
+ return normalize_embeddings([embeddings])[0]
+
@override
def _query_embedding(
self,
@@ -315,6 +330,9 @@ class ChromaVectorStore(CollectionManageableVectorStore):
filters: Dict[str, Any] | None = None,
**kwargs: Any,
) -> List[Document]:
+ # ``embedding`` may be a pre-normalized ndarray (async path) or a raw
list
+ # (sync path); chroma takes the ndarray branch for the former,
avoiding any
+ # numpy work on this thread.
collection = self._resolve_collection(collection_name, kwargs)
results = collection.query(
query_embeddings=[embedding],