imbajin commented on code in PR #424:
URL: 
https://github.com/apache/incubator-hugegraph-doc/pull/424#discussion_r2480925428


##########
content/en/blog/hugegraph-ai/agentic_graphrag.md:
##########
@@ -0,0 +1,452 @@
+---
+date: 2025-10-29
+title: "Agentic GraphRAG"
+linkTitle: "Agentic GraphRAG"
+---
+
+# Project Background
+
+To address the problem of temporal discrepancies between model training data 
and real-world data, Retrieval-Augmented Generation (RAG) technology has 
emerged. RAG, as the name suggests, is a technique that retrieves relevant data 
from external data sources (Retrieval) to augment (Argument) the quality of the 
answers generated (Generation) by large language models.
+
+The earliest RAG employed a simple Retrieval-Generation architecture. We take 
the user's question, perform some pre-processing (keyword extraction, etc.), 
obtain the pre-processed question, and then use an Embedding Model to grab 
relevant information from a vast amount of data as a Prompt, which is then fed 
to the large language model to enhance the quality of its responses.
+
+However, relying solely on semantic similarity matching to retrieve relevant 
information may not handle all situations, as the information that can enhance 
answer quality may not always be semantically similar to the question itself. A 
common example is: "Tell me the ontological view of the disciple of the 
philosopher who proposed that water is the origin of all things." Our data may 
not directly contain the answer to this question. The knowledge base might 
contain:
+
+1. Thales proposed that water is the origin of all things.
+2. Anaximander was a disciple of Thales.
+3. Anaximander identified the Apeiron, which has no formal definition, as the 
origin of all things.
+
+If we rely solely on semantic similarity matching, we are likely to only 
retrieve the first sentence to augment the large language model's answer. 
However, without information from sentences 2 and 3, and if the large language 
model lacks philosophy-related knowledge in its training dxata, it will be 
unable to correctly answer the question and might even "hallucinate."
+
+Therefore, GraphRAG technology was developed. A typical GraphRAG involves two 
steps:
+
+1. Offline: We need to build a graph index for the knowledge base offline 
(converting unstructured data into structured data and storing it in a graph 
database).
+2. Online: When the GraphRAG system receives a user question, it can capture 
the relationships between different entities in the knowledge base using the 
graph database. Consequently, we can retrieve the three sentences above (the 
specific graph database index might look like the following example).
+
+<div style="text-align: center;">
+  <img src="/blog/images/images-server/agentic-background.png" alt="image" 
width="400">
+</div>
+
+However, GraphRAG itself also presents several challenges:
+
+1. How to construct the Graph Index is a complex task, and the quality of the 
Graph Index impacts the quality of the model's answers.
+2. The GraphRAG index construction process consumes a significant number of 
tokens.
+3. GraphRAG involves a variety of graph algorithms. How can we achieve the 
best Retrieval performance? (The configuration space is too large).
+
+This project primarily focuses on the third issue. We aim to leverage the 
generalization capabilities of large language models to automatically identify 
the user's intent within the question and then select the appropriate 
configuration (such as choosing the most suitable graph algorithm) to retrieve 
the corresponding data from the graph database to enhance the quality of the 
large language model's answer. This is the objective of Agentic GraphRAG.
+
+# Existing Workflow: Elegant Decoupling, Unfinished Parallelism
+
+The current HugeGraph-AI project has two core abstractions:
+
+1. Operator: Represents an "atomic operation unit" responsible for completing 
a specific subtask, such as vector index construction, vector similarity 
search, graph data related operations, and so on.
+2. Workflow: An execution flow composed of Operators as nodes in a 
**chain-like** structure. The pre-defined Workflows in the project correspond 
one-to-one with the project's demo use cases (e.g., GraphRAG, 
Vector-Similarity-Based RAG).
+
+The implementation of an Operator needs to adhere to the following interface:
+
+```python
+class Operator:
+       @abstractmethod
+       def run(context: dict[str, Any]) -> dict[str,Any]:
+               return {}
+```
+
+During actual runtime, an Operator accepts a dictionary-type context object as 
input, and the returned object is also a dictionary, which can be used as input 
for the next Operator. This design has one very clever aspect: it decouples the 
dependencies between different Operators from the specific implementation of 
the Operator itself. Each Operator is a relatively independent entity. If 
Operator A needs to rely on the output of Operator B, it only needs to check if 
the context object contains the output of Operator B. This is a loosely coupled 
design. The advantage is that we can easily combine different Operators freely. 
Assembling (configuring) a suitable Workflow to serve user requests based on 
different user inputs - isn't that precisely the goal of Agentic GraphRAG 
mentioned in the project background?
+
+```
+👉🏼 Theoretically, the existing design can already transition smoothly to 
Agentic GraphRAG. However, the current design has several outstanding issues:
+    1. The existing scheduler only supports chain-like Workflows, missing 
potential parallelism.
+    2. The existing scheduler cannot reuse Workflows that are repeatedly used.
+```
+
+# Breaking Free from Chains: Embracing a New Architecture
+
+The previous scheduler inspired us with the idea that decoupling at the 
Operator level is a good design principle. However, the limited capabilities of 
the scheduler itself restrict the potential of the Workflow. Therefore, we plan 
to replace the scheduler in the project! After a brief survey of several 
different Workflow orchestration frameworks, we believe the following features 
are the criteria for selecting a scheduler (hereinafter, we uniformly refer to 
the framework's orchestration object as **Workflow**, and Workflow consists of 
a series of **Task**s):
+
+1. Parallelism: Can different Tasks in a Workflow without data dependencies be 
automatically executed in parallel?
+2. Low Coupling: The specific implementation of a Task should be decoupled 
from the Workflow itself (in layman's terms: can a Task be a node in several 
different Workflows, and does the implementation of a Task need to include 
constraints related to dependencies on other Tasks?)
+3. Data Sharing: Since we want to decouple the dependencies between different 
Tasks, we need a Workflow-level data sharing mechanism to share data (for 
parameter passing) between different Tasks.
+4. Provides a Python Interface.
+
+## AI Framework Rumble
+
+We first turned our attention to the currently popular AI Workflow scheduling 
frameworks. Around the aforementioned dimensions, we investigated several 
different Workflow orchestration frameworks - LlamaIndex, Agno, Pydantic-Ai, 
and LangGraph.
+
+### LlamaIndex
+
+Regarding LlamaIndex, we will use a common example to illustrate the design 
philosophy of this framework.
+
+```python
+from workflows import Workflow, Context, step
+from workflows.events import StartEvent, StopEvent, Event
+
+class StepEvent(Event):
+    message: str
+
+class MyWorkflow(Workflow):
+
+    @step
+    async def step_one(self, ctx: Context, ev: StartEvent) -> StepEvent:
+       current_count = await ctx.store.get("count", default=0)
+       current_count += 1
+       await ctx.store.set("count", current_count)
+       print("step one called once")
+       return StepEvent("launch step two")
+       
+    @step
+    async def step_two(self, ctx: Context, ev: StepEvent) -> StopEvent:
+       print("step two called once")
+       return StopEvent()
+```
+
+From the above simple example, we can see many problems. First, let's clarify 
a concept: a Workflow consists of two elements: Tasks and the dependencies 
between Tasks. Once these two elements are determined, a Workflow is 
established. We can see that in LlamaIndex, the implementation of each Task 
(corresponding to the function annotated with @step in the code) has a 
dependency on the Workflow. This is because the implementation of each Task 
needs to pass the Event object as a parameter, but the Event parameter is 
actually a constraint on the dependencies between Tasks. Therefore, LlamaIndex 
does not have the characteristic of low coupling. At the same time, we also 
found that the Task being a member function of the Workflow class itself 
violates our earlier requirement that Tasks should be able to be used in 
multiple different Workflows. However, after investigation, LlamaIndex's data 
sharing and parallel features are reasonably good. It's just that the 
programming interface built o
 n the event-driven model sacrifices programming flexibility while ensuring 
ease of use.
+
+### Agno
+
+Still starting with the example:
+
+```python
+from agno.workflow import Router, Step, Workflow
+
+def route_by_topic(step_input) -> List[Step]:
+    topic = step_input.input.lower()
+
+    if "tech" in topic:
+        return [Step(name="Tech Research", agent=tech_expert)]
+    elif "business" in topic:
+        return [Step(name="Business Research", agent=biz_expert)]
+    else:
+        return [Step(name="General Research", agent=generalist)]
+
+workflow = Workflow(
+    name="Expert Routing",
+    steps=[
+        Router(
+            name="Topic Router",
+            selector=route_by_topic,
+            choices=[tech_step, business_step, general_step]
+        ),
+        Step(name="Synthesis", agent=synthesizer),
+    ]
+)
+
+workflow.print_response("Latest developments in artificial intelligence and 
machine learning", markdown=True)
+```
+
+From this example, we can see that the binding relationship between the 
Workflow itself and the Task is determined by specifying the **steps** 
parameter. Theoretically, after defining a Task, we can use it in different 
Workflows. Agno's design meets our low-coupling standard.
+
+However, there are certain limitations in terms of data sharing and task 
parallelism.
+
+First, let's look at task parallelism, with the following example:
+
+```python
+workflow = Workflow(
+    name="Parallel Research Pipeline",
+    steps=[
+        Parallel(
+            Step(name="HackerNews Research", agent=hn_researcher),
+            Step(name="Web Research", agent=web_researcher),
+            Step(name="Academic Research", agent=academic_researcher),
+            name="Research Step"
+        ),
+        Step(name="Synthesis", agent=synthesizer),  # Combines the results and 
produces a report
+    ]
+)
+```
+
+Agno specifically designed a parallel interface, requiring us to explicitly 
define which tasks can be executed in parallel during static compilation 
(although Python doesn't really have a compilation time; it should be called 
"when writing code" haha 😀). However, the Workflow ultimately constructed by 
Agentic GraphRAG might be planned by the model at runtime, determined 
dynamically. Considering this, we believe Agno's parallelism feature does not 
meet our requirements.
+
+Next is data sharing. The Agno framework supports three different types of 
Tasks:
+
+1. Agent
+2. Team (composed of multiple Agents)
+3. Pure Function
+
+We inspected the latest version of the Agno source code at the time of our 
research and found that Agno supports state sharing only between Agents and 
Teams. Therefore, for those Tasks that are suitable for implementation with 
Pure Functions, we need to support an additional data-sharing mechanism. 
Consequently, Agno's data-sharing mechanism also does not meet our requirements.
+
+### Pydantic-Ai
+We saw from the official documentation
+
+<div style="text-align: center;">
+  <img src="/blog/images/images-server/agentic-pydantic.png" alt="image" 
width="800">
+</div>
+
+Surprisingly, the Pydantic-Ai framework doesn't support automatic parallelism 
at the Task level.
+
+Similar to the LlamaIndex framework, it adopts an event-driven programming 
model. Therefore, the Workflow and Task are not completely decoupled. However, 
it's worth noting that a Pydantic-Ai Task can be used in multiple different 
Workflows.
+
+### LangGraph
+
+Finally, we've arrived at LangGraph. The reason we hadn't researched LangGraph 
before was because a teammate believed LangGraph itself was too heavy. In the 
previous version, even when using only a part of LangGraph's functionality 
(scheduling), it was necessary to import LangGraph's full dependencies. 
Importing LangGraph might make the project "heavy." Seeing phrases like "xxx is 
xxx times faster than LangGraph" in other open-source projects also influenced 
our decision-making. So, it's only now that we're putting it on the research 
agenda.
+
+Let's take a look at a LangGraph example.
+
+```python
+class State(TypedDict):
+    topic: str
+    joke: str
+    improved_joke: str
+
+# Nodes
+def generate_joke(state: State):
+    """First LLM call to generate initial joke"""
+
+    msg = llm.invoke(f"Write a short joke about {state['topic']}")
+    return {"joke": msg.content}
+
+def check_punchline(state: State):
+    """Gate function to check if the joke has a punchline"""
+
+    # Simple check - does the joke contain "?" or "!"
+    if "?" in state["joke"] or "!" in state["joke"]:
+        return "Pass"
+    return "Fail"
+
+def improve_joke(state: State):
+    """Second LLM call to improve the joke"""
+
+    msg = llm.invoke(f"Make this joke funnier by adding wordplay: 
{state['joke']}")
+    return {"improved_joke": msg.content}
+
+# Build workflow
+workflow = StateGraph(State)
+
+# Add nodes
+workflow.add_node("generate_joke", generate_joke)
+workflow.add_node("improve_joke", improve_joke)
+
+# Add edges to connect nodes
+workflow.add_edge(START, "generate_joke")
+workflow.add_conditional_edges(
+    "generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
+)
+workflow.add_edge("improve_joke", END)
+
+# Compile
+chain = workflow.compile()
+
+# Invoke
+state = chain.invoke({"topic": "cats"}
+```
+
+This is a simplified example from the official documentation. We can see that 
LangGraph, based on the Graph API, decouples the Workflow and Task by calling 
workflow.add_edge to specify the Workflow's dependencies. It also supports a 
global State as the Workflow's state for data sharing between Tasks. According 
to the official documentation, LangGraph supports automatic parallel execution 
of Tasks. We've finally found a Workflow orchestration framework that meets all 
of our requirements!
+
+### 总结
+
+|  | Parallelism | Low Coupling | Data Sharing | Python Interface |
+| --- | --- | --- | --- | --- |
+| LlamaIndex | Supported |     Not Supported | Supported | Supported |
+| Agno | Supported but doesn't meet requirements | Supported |         
Supported but doesn't meet requirements | Supported |
+| Pydantic-Ai | Not Supported | Not Supported | Supported | Supported |

Review Comment:
   ```suggestion
   ```text
   🤔 The current version of the Scheduler implements its core responsibilities, 
providing a stable and efficient scheduling foundation for the entire system. 
Its main features include:
   ```
   💡 Suggest adding language identifier `text` to improve rendering



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to