This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 378f181 [hotfix] Refactor codes with latest API changes (#238)
378f181 is described below
commit 378f181a6f0f5914e4becc76a8be553b928f97aa
Author: Alan Z. <[email protected]>
AuthorDate: Tue Sep 30 13:45:27 2025 -0700
[hotfix] Refactor codes with latest API changes (#238)
---
python/flink_agents/examples/rag/__init__.py | 17 ++++
.../examples/rag/knowledge_base_setup.py | 81 +++++++++++++++
.../examples/{ => rag}/rag_agent_example.py | 113 ++++-----------------
3 files changed, 120 insertions(+), 91 deletions(-)
diff --git a/python/flink_agents/examples/rag/__init__.py
b/python/flink_agents/examples/rag/__init__.py
new file mode 100644
index 0000000..e154fad
--- /dev/null
+++ b/python/flink_agents/examples/rag/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
diff --git a/python/flink_agents/examples/rag/knowledge_base_setup.py
b/python/flink_agents/examples/rag/knowledge_base_setup.py
new file mode 100644
index 0000000..e574378
--- /dev/null
+++ b/python/flink_agents/examples/rag/knowledge_base_setup.py
@@ -0,0 +1,81 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+
+import os
+
+import chromadb
+
+from flink_agents.integrations.embedding_models.local.ollama_embedding_model
import (
+ OllamaEmbeddingModelConnection,
+)
+
+OLLAMA_EMBEDDING_MODEL = os.environ.get("OLLAMA_EMBEDDING_MODEL",
"nomic-embed-text")
+
+"""Utility for populating ChromaDB with sample knowledge documents for RAG
examples."""
+
+
+def populate_knowledge_base() -> None:
+ """Populate ChromaDB with sample knowledge documents using Ollama
embeddings."""
+ print("Populating ChromaDB with sample knowledge documents...")
+
+ # Create connections directly
+ embedding_connection = OllamaEmbeddingModelConnection()
+ chroma_client = chromadb.EphemeralClient()
+
+ # Get collection (create if doesn't exist)
+ collection_name = "example_knowledge_base"
+ collection = chroma_client.get_or_create_collection(
+ name=collection_name,
+ metadata=None,
+ )
+
+ # Sample documents to embed and store
+ documents = [
+ "Apache Flink is a stream processing framework for distributed,
high-performing, always-available, and accurate data streaming applications.",
+ "Flink provides exactly-once state consistency guarantees and
low-latency processing with high throughput.",
+ "Apache Flink Agents is an innovative Agentic AI framework built on
Apache Flink that enables distributed, stateful execution of AI agents.",
+ "Vector stores are databases optimized for storing and searching
high-dimensional vectors, commonly used in context retrieval applications.",
+ "Context retrieval combines information retrieval with language
generation to provide more accurate and contextual responses by finding
relevant information.",
+ ]
+
+ metadatas = [
+ {"topic": "flink", "source": "documentation"},
+ {"topic": "flink", "source": "documentation"},
+ {"topic": "flink_agents", "source": "documentation"},
+ {"topic": "vector_stores", "source": "ai_concepts"},
+ {"topic": "context_retrieval", "source": "ai_concepts"},
+ ]
+
+ # Generate real embeddings using Ollama
+ embeddings = []
+ for _i, doc in enumerate(documents):
+ embedding = embedding_connection.embed(doc,
model=OLLAMA_EMBEDDING_MODEL)
+ embeddings.append(embedding)
+
+ # Prepare data for ChromaDB
+ test_data = {
+ "documents": documents,
+ "embeddings": embeddings,
+ "metadatas": metadatas,
+ "ids": [f"doc{i + 1}" for i in range(len(documents))]
+ }
+
+ # Add documents to ChromaDB
+ collection.add(**test_data)
+
+ print(f"Knowledge base setup complete! Added {len(documents)} documents to
ChromaDB.")
diff --git a/python/flink_agents/examples/rag_agent_example.py
b/python/flink_agents/examples/rag/rag_agent_example.py
similarity index 62%
rename from python/flink_agents/examples/rag_agent_example.py
rename to python/flink_agents/examples/rag/rag_agent_example.py
index 5c1a7e8..92da5bd 100644
--- a/python/flink_agents/examples/rag_agent_example.py
+++ b/python/flink_agents/examples/rag/rag_agent_example.py
@@ -21,13 +21,10 @@ from flink_agents.api.agent import Agent
from flink_agents.api.chat_message import ChatMessage, MessageRole
from flink_agents.api.decorators import (
action,
- chat_model_connection,
chat_model_setup,
- embedding_model_connection,
embedding_model_setup,
prompt,
- vector_store_connection,
- vector_store_setup,
+ vector_store,
)
from flink_agents.api.events.chat_event import ChatRequestEvent,
ChatResponseEvent
from flink_agents.api.events.context_retrieval_event import (
@@ -39,6 +36,7 @@ from flink_agents.api.execution_environment import
AgentsExecutionEnvironment
from flink_agents.api.prompts.prompt import Prompt
from flink_agents.api.resource import ResourceDescriptor, ResourceType
from flink_agents.api.runner_context import RunnerContext
+from flink_agents.examples.rag.knowledge_base_setup import
populate_knowledge_base
from flink_agents.integrations.chat_models.ollama_chat_model import (
OllamaChatModelConnection,
OllamaChatModelSetup,
@@ -48,8 +46,7 @@ from
flink_agents.integrations.embedding_models.local.ollama_embedding_model imp
OllamaEmbeddingModelSetup,
)
from flink_agents.integrations.vector_stores.chroma.chroma_vector_store import
(
- ChromaVectorStoreConnection,
- ChromaVectorStoreSetup,
+ ChromaVectorStore,
)
OLLAMA_CHAT_MODEL = os.environ.get("OLLAMA_CHAT_MODEL", "qwen3:8b")
@@ -83,12 +80,6 @@ User Question:
Please provide a helpful answer based on the context provided."""
return Prompt.from_text(template)
- @embedding_model_connection
- @staticmethod
- def ollama_embedding_connection() -> ResourceDescriptor:
- """Embedding model connection for Ollama."""
- return ResourceDescriptor(clazz=OllamaEmbeddingModelConnection)
-
@embedding_model_setup
@staticmethod
def text_embedder() -> ResourceDescriptor:
@@ -99,29 +90,16 @@ Please provide a helpful answer based on the context
provided."""
model=OLLAMA_EMBEDDING_MODEL,
)
- @vector_store_connection
- @staticmethod
- def chroma_connection() -> ResourceDescriptor:
- """Vector store connection for ChromaDB (in-memory for demo)."""
- return ResourceDescriptor(clazz=ChromaVectorStoreConnection)
-
- @vector_store_setup
+ @vector_store
@staticmethod
def knowledge_base() -> ResourceDescriptor:
"""Vector store setup for knowledge base."""
return ResourceDescriptor(
- clazz=ChromaVectorStoreSetup,
- connection="chroma_connection",
+ clazz=ChromaVectorStore,
embedding_model="text_embedder",
collection="example_knowledge_base",
)
- @chat_model_connection
- @staticmethod
- def ollama_chat_connection() -> ResourceDescriptor:
- """Chat model connection for Ollama."""
- return ResourceDescriptor(clazz=OllamaChatModelConnection,
model=OLLAMA_CHAT_MODEL)
-
@chat_model_setup
@staticmethod
def chat_model() -> ResourceDescriptor:
@@ -129,6 +107,7 @@ Please provide a helpful answer based on the context
provided."""
return ResourceDescriptor(
clazz=OllamaChatModelSetup,
connection="ollama_chat_connection",
+ model=OLLAMA_CHAT_MODEL
)
@action(InputEvent)
@@ -147,7 +126,7 @@ Please provide a helpful answer based on the context
provided."""
@action(ContextRetrievalResponseEvent)
@staticmethod
def process_retrieved_context(
- event: ContextRetrievalResponseEvent, ctx: RunnerContext
+ event: ContextRetrievalResponseEvent, ctx: RunnerContext
) -> None:
"""Process retrieved context and create enhanced chat request."""
user_query = event.query
@@ -155,7 +134,7 @@ Please provide a helpful answer based on the context
provided."""
# Create context from retrieved documents
context_text = "\n\n".join(
- [f"{i+1}. {doc.content}" for i, doc in enumerate(retrieved_docs)]
+ [f"{i + 1}. {doc.content}" for i, doc in enumerate(retrieved_docs)]
)
# Get prompt resource and format it
@@ -181,87 +160,39 @@ Please provide a helpful answer based on the context
provided."""
ctx.send_event(OutputEvent(output=event.response.content))
-def populate_knowledge_base() -> None:
- """Populate ChromaDB with sample knowledge documents using Ollama
embeddings."""
- print("Populating ChromaDB with sample knowledge documents...")
-
- # Create connections directly
- embedding_connection = OllamaEmbeddingModelConnection()
- vector_store_connection = ChromaVectorStoreConnection()
-
- # Get collection (create if doesn't exist)
- collection_name = "example_knowledge_base"
- collection = vector_store_connection.client.get_or_create_collection(
- name=collection_name,
- metadata=None,
- )
-
- # Sample documents to embed and store
- documents = [
- "Apache Flink is a stream processing framework for distributed,
high-performing, always-available, and accurate data streaming applications.",
- "Flink provides exactly-once state consistency guarantees and
low-latency processing with high throughput.",
- "Apache Flink Agents is an innovative Agentic AI framework built on
Apache Flink that enables distributed, stateful execution of AI agents.",
- "Vector stores are databases optimized for storing and searching
high-dimensional vectors, commonly used in context retrieval applications.",
- "Context retrieval combines information retrieval with language
generation to provide more accurate and contextual responses by finding
relevant information.",
- ]
-
- metadatas = [
- {"topic": "flink", "source": "documentation"},
- {"topic": "flink", "source": "documentation"},
- {"topic": "flink_agents", "source": "documentation"},
- {"topic": "vector_stores", "source": "ai_concepts"},
- {"topic": "context_retrieval", "source": "ai_concepts"},
- ]
-
- # Generate real embeddings using Ollama
- embeddings = []
- for _i, doc in enumerate(documents):
- embedding = embedding_connection.embed(doc,
model=OLLAMA_EMBEDDING_MODEL)
- embeddings.append(embedding)
-
- # Prepare data for ChromaDB
- test_data = {
- "documents": documents,
- "embeddings": embeddings,
- "metadatas": metadatas,
- "ids": [f"doc{i+1}" for i in range(len(documents))]
- }
-
- # Add documents to ChromaDB
- collection.add(**test_data)
-
- print(f"Knowledge base setup complete! Added {len(documents)} documents to
ChromaDB.")
-
-
if __name__ == "__main__":
print("Starting RAG Example Agent...")
- # Initialize knowledge base with real data
+ # Populate vector store with sample documents
populate_knowledge_base()
agent = MyRAGAgent()
- env = AgentsExecutionEnvironment.get_execution_environment()
+ # Prepare example queries
input_list = []
-
- output_list = env.from_list(input_list).apply(agent).to_list()
-
test_queries = [
{"key": "001", "value": "What is Apache Flink?"},
{"key": "002", "value": "What is Apache Flink Agents?"},
{"key": "003", "value": "What is Python?"},
]
-
input_list.extend(test_queries)
- env.execute()
+ # Setup the Agents execution environment
+ agents_env = AgentsExecutionEnvironment.get_execution_environment()
+
+ # Setup Ollama embedding and chat model connections
+ agents_env.add_resource("ollama_embedding_connection",
ResourceDescriptor(clazz=OllamaEmbeddingModelConnection))
+ agents_env.add_resource("ollama_chat_connection",
ResourceDescriptor(clazz=OllamaChatModelConnection))
- print("\n" + "="*50)
+ output_list = agents_env.from_list(input_list).apply(agent).to_list()
+
+ agents_env.execute()
+
+ print("\n" + "=" * 50)
print("RAG Example Results:")
- print("="*50)
+ print("=" * 50)
for output in output_list:
for key, value in output.items():
print(f"\n[{key}] Response: {value}")
print("-" * 40)
-