xintongsong commented on code in PR #126:
URL: https://github.com/apache/flink-agents/pull/126#discussion_r2300102467


##########
python/flink_agents/plan/actions/chat_model_action.py:
##########
@@ -25,59 +25,65 @@
 from flink_agents.plan.actions.action import Action
 from flink_agents.plan.function import PythonFunction
 
+TOOL_CALL_CONTEXT = "_TOOL_CALL_CONTEXT"
+
 
 def process_chat_request_or_tool_response(event: Event, ctx: RunnerContext) -> 
None:
     """Built-in action for processing a chat request or tool response."""
+    short_term_memory = ctx.get_short_term_memory()
     if isinstance(event, ChatRequestEvent):
         chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
         # TODO: support async execution of chat.
         response = chat_model.chat(event.messages)
+
+        request_id = event.id
         # call tool
         if len(response.tool_calls) > 0:
-            for tool_call in response.tool_calls:
-                # store the tool call context in short term memory
-                state = ctx.get_short_term_memory()
-                # TODO: Because memory doesn't support remove currently, so we 
use
-                #  dict to store tool context in memory and remove the specific
-                #  tool context from dict after consuming. This will cause some
-                #  overhead for we need get the whole dict and overwrite it to 
memory
-                #  each time we update a specific tool context.
-                #  After memory supports remove, we can use
-                #  "__tool_context/tool_call_id" to store and remove the 
specific tool
-                #  context directly.
-                if not state.is_exist("__tool_context"):
-                    state.set("__tool_context", {})
-                tool_context = state.get("__tool_context")
-                tool_call_id = tool_call["id"]
-                tool_context[tool_call_id] = event
-                tool_context[tool_call_id].messages.append(response)
-                state.set("__tool_context", tool_context)
-                ctx.send_event(
-                    ToolRequestEvent(
-                        id=tool_call_id,
-                        tool=tool_call["function"]["name"],
-                        kwargs=tool_call["function"]["arguments"],
-                    )
+            # TODO: Because memory doesn't support remove currently, so we use
+            #  dict to store tool context in memory and remove the specific
+            #  tool context from dict after consuming. This will cause write 
and
+            #  read amplification for we need get the whole dict and overwrite 
it
+            #  to memory each time we update a specific tool context.
+            #  After memory supports remove, we can use 
"TOOL_CALL_CONTEXT/request_id"
+            #  to store and remove the specific tool context directly.
+            # save tool call context
+            tool_call_context = short_term_memory.get(TOOL_CALL_CONTEXT)
+            if not tool_call_context:
+                tool_call_context = {}
+            if request_id not in tool_call_context:
+                tool_call_context[request_id] = event
+            # append response to request event messages
+            tool_call_context[request_id].messages.append(response)

Review Comment:
   Why using the event as the context? It's against intuition that we are 
modifying a received event.



##########
python/flink_agents/api/execution_environment.py:
##########
@@ -170,3 +197,122 @@ def from_table(
     @abstractmethod
     def execute(self) -> None:
         """Execute agent individually."""
+
+    def add_action(

Review Comment:
   Do we need this on the execution environment?



##########
python/flink_agents/plan/actions/chat_model_action.py:
##########
@@ -25,59 +25,65 @@
 from flink_agents.plan.actions.action import Action
 from flink_agents.plan.function import PythonFunction
 
+TOOL_CALL_CONTEXT = "_TOOL_CALL_CONTEXT"
+
 
 def process_chat_request_or_tool_response(event: Event, ctx: RunnerContext) -> 
None:
     """Built-in action for processing a chat request or tool response."""
+    short_term_memory = ctx.get_short_term_memory()
     if isinstance(event, ChatRequestEvent):
         chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
         # TODO: support async execution of chat.
         response = chat_model.chat(event.messages)
+
+        request_id = event.id
         # call tool
         if len(response.tool_calls) > 0:
-            for tool_call in response.tool_calls:
-                # store the tool call context in short term memory
-                state = ctx.get_short_term_memory()
-                # TODO: Because memory doesn't support remove currently, so we 
use
-                #  dict to store tool context in memory and remove the specific
-                #  tool context from dict after consuming. This will cause some
-                #  overhead for we need get the whole dict and overwrite it to 
memory
-                #  each time we update a specific tool context.
-                #  After memory supports remove, we can use
-                #  "__tool_context/tool_call_id" to store and remove the 
specific tool
-                #  context directly.
-                if not state.is_exist("__tool_context"):
-                    state.set("__tool_context", {})
-                tool_context = state.get("__tool_context")
-                tool_call_id = tool_call["id"]
-                tool_context[tool_call_id] = event
-                tool_context[tool_call_id].messages.append(response)
-                state.set("__tool_context", tool_context)
-                ctx.send_event(
-                    ToolRequestEvent(
-                        id=tool_call_id,
-                        tool=tool_call["function"]["name"],
-                        kwargs=tool_call["function"]["arguments"],
-                    )
+            # TODO: Because memory doesn't support remove currently, so we use
+            #  dict to store tool context in memory and remove the specific
+            #  tool context from dict after consuming. This will cause write 
and
+            #  read amplification for we need get the whole dict and overwrite 
it
+            #  to memory each time we update a specific tool context.
+            #  After memory supports remove, we can use 
"TOOL_CALL_CONTEXT/request_id"
+            #  to store and remove the specific tool context directly.
+            # save tool call context
+            tool_call_context = short_term_memory.get(TOOL_CALL_CONTEXT)
+            if not tool_call_context:
+                tool_call_context = {}
+            if request_id not in tool_call_context:
+                tool_call_context[request_id] = event
+            # append response to request event messages
+            tool_call_context[request_id].messages.append(response)
+            # update short term memory
+            short_term_memory.set(TOOL_CALL_CONTEXT, tool_call_context)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=event.id,
+                    tool_calls=response.tool_calls,
                 )
-
+            )
         # send response
         else:
+            # clear tool call context related to specific request id

Review Comment:
   Is it possible that the chat response contains both text and tool calls?



##########
python/flink_agents/plan/actions/chat_model_action.py:
##########
@@ -25,59 +25,65 @@
 from flink_agents.plan.actions.action import Action
 from flink_agents.plan.function import PythonFunction
 
+TOOL_CALL_CONTEXT = "_TOOL_CALL_CONTEXT"
+
 
 def process_chat_request_or_tool_response(event: Event, ctx: RunnerContext) -> 
None:
     """Built-in action for processing a chat request or tool response."""
+    short_term_memory = ctx.get_short_term_memory()
     if isinstance(event, ChatRequestEvent):
         chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
         # TODO: support async execution of chat.
         response = chat_model.chat(event.messages)
+
+        request_id = event.id
         # call tool
         if len(response.tool_calls) > 0:
-            for tool_call in response.tool_calls:
-                # store the tool call context in short term memory
-                state = ctx.get_short_term_memory()
-                # TODO: Because memory doesn't support remove currently, so we 
use
-                #  dict to store tool context in memory and remove the specific
-                #  tool context from dict after consuming. This will cause some
-                #  overhead for we need get the whole dict and overwrite it to 
memory
-                #  each time we update a specific tool context.
-                #  After memory supports remove, we can use
-                #  "__tool_context/tool_call_id" to store and remove the 
specific tool
-                #  context directly.
-                if not state.is_exist("__tool_context"):
-                    state.set("__tool_context", {})
-                tool_context = state.get("__tool_context")
-                tool_call_id = tool_call["id"]
-                tool_context[tool_call_id] = event
-                tool_context[tool_call_id].messages.append(response)
-                state.set("__tool_context", tool_context)
-                ctx.send_event(
-                    ToolRequestEvent(
-                        id=tool_call_id,
-                        tool=tool_call["function"]["name"],
-                        kwargs=tool_call["function"]["arguments"],
-                    )
+            # TODO: Because memory doesn't support remove currently, so we use
+            #  dict to store tool context in memory and remove the specific
+            #  tool context from dict after consuming. This will cause write 
and
+            #  read amplification for we need get the whole dict and overwrite 
it
+            #  to memory each time we update a specific tool context.
+            #  After memory supports remove, we can use 
"TOOL_CALL_CONTEXT/request_id"
+            #  to store and remove the specific tool context directly.
+            # save tool call context
+            tool_call_context = short_term_memory.get(TOOL_CALL_CONTEXT)
+            if not tool_call_context:
+                tool_call_context = {}
+            if request_id not in tool_call_context:
+                tool_call_context[request_id] = event
+            # append response to request event messages
+            tool_call_context[request_id].messages.append(response)
+            # update short term memory
+            short_term_memory.set(TOOL_CALL_CONTEXT, tool_call_context)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=event.id,
+                    tool_calls=response.tool_calls,
                 )
-
+            )
         # send response
         else:
+            # clear tool call context related to specific request id
+            tool_call_context = short_term_memory.get(TOOL_CALL_CONTEXT)
+            if tool_call_context and request_id in tool_call_context:
+                tool_call_context.pop(request_id)
+                short_term_memory.set(TOOL_CALL_CONTEXT, tool_call_context)
             ctx.send_event(ChatResponseEvent(request=event, response=response))
     elif isinstance(event, ToolResponseEvent):
-        state = ctx.get_short_term_memory()
-
-        if state.is_exist("__tool_context"):
-            tool_context = state.get("__tool_context")
-            tool_call_id = event.request.id
-            if tool_context is not None and tool_call_id in tool_context:
-                # get the specific tool call context from short term memory
-                specific_tool_ctx = tool_context.pop(tool_call_id)
-                specific_tool_ctx.messages.append(
-                    ChatMessage(role=MessageRole.TOOL, 
content=str(event.response))
-                )
-                ctx.send_event(specific_tool_ctx)
-                # update short term memory to remove the specific tool call 
context
-                state.set("__tool_context", tool_context)
+        request_id = event.request.id
+        responses = event.responses
+        # get tool call context
+        tool_call_context = short_term_memory.get(TOOL_CALL_CONTEXT)
+        for response in responses.values():
+            tool_call_context[request_id].messages.append(
+                ChatMessage(role=MessageRole.TOOL, content=str(response))
+            )
+        # update tool call context
+        short_term_memory.set(TOOL_CALL_CONTEXT, tool_call_context)
+        process_chat_request_or_tool_response(
+            event=tool_call_context[request_id], ctx=ctx
+        )

Review Comment:
   I'd suggest to refactor the codes so that only necessary codes are called 
recursively.



##########
python/flink_agents/plan/actions/chat_model_action.py:
##########
@@ -25,59 +25,65 @@
 from flink_agents.plan.actions.action import Action
 from flink_agents.plan.function import PythonFunction
 
+TOOL_CALL_CONTEXT = "_TOOL_CALL_CONTEXT"
+
 
 def process_chat_request_or_tool_response(event: Event, ctx: RunnerContext) -> 
None:
     """Built-in action for processing a chat request or tool response."""
+    short_term_memory = ctx.get_short_term_memory()
     if isinstance(event, ChatRequestEvent):
         chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
         # TODO: support async execution of chat.
         response = chat_model.chat(event.messages)
+
+        request_id = event.id
         # call tool
         if len(response.tool_calls) > 0:
-            for tool_call in response.tool_calls:
-                # store the tool call context in short term memory
-                state = ctx.get_short_term_memory()
-                # TODO: Because memory doesn't support remove currently, so we 
use
-                #  dict to store tool context in memory and remove the specific
-                #  tool context from dict after consuming. This will cause some
-                #  overhead for we need get the whole dict and overwrite it to 
memory
-                #  each time we update a specific tool context.
-                #  After memory supports remove, we can use
-                #  "__tool_context/tool_call_id" to store and remove the 
specific tool
-                #  context directly.
-                if not state.is_exist("__tool_context"):
-                    state.set("__tool_context", {})
-                tool_context = state.get("__tool_context")
-                tool_call_id = tool_call["id"]
-                tool_context[tool_call_id] = event
-                tool_context[tool_call_id].messages.append(response)
-                state.set("__tool_context", tool_context)
-                ctx.send_event(
-                    ToolRequestEvent(
-                        id=tool_call_id,
-                        tool=tool_call["function"]["name"],
-                        kwargs=tool_call["function"]["arguments"],
-                    )
+            # TODO: Because memory doesn't support remove currently, so we use
+            #  dict to store tool context in memory and remove the specific
+            #  tool context from dict after consuming. This will cause write 
and
+            #  read amplification for we need get the whole dict and overwrite 
it
+            #  to memory each time we update a specific tool context.
+            #  After memory supports remove, we can use 
"TOOL_CALL_CONTEXT/request_id"
+            #  to store and remove the specific tool context directly.
+            # save tool call context
+            tool_call_context = short_term_memory.get(TOOL_CALL_CONTEXT)
+            if not tool_call_context:
+                tool_call_context = {}
+            if request_id not in tool_call_context:
+                tool_call_context[request_id] = event
+            # append response to request event messages
+            tool_call_context[request_id].messages.append(response)
+            # update short term memory
+            short_term_memory.set(TOOL_CALL_CONTEXT, tool_call_context)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=event.id,
+                    tool_calls=response.tool_calls,
                 )
-
+            )
         # send response
         else:
+            # clear tool call context related to specific request id

Review Comment:
   We should make it explicit that a response without tool call means end of 
the chat.



##########
python/flink_agents/runtime/local_execution_environment.py:
##########
@@ -54,6 +54,14 @@ def apply(self, agent: Agent) -> AgentBuilder:
         if self.__runner is not None:
             err_msg = "LocalAgentBuilder doesn't support apply multiple 
agents."
             raise RuntimeError(err_msg)
+        # inspect refer actions and resources from env to agent.
+        for action_name in agent._action_names:
+            agent.actions[action_name] = self.__env.actions[action_name]
+        for type, names in agent._resource_names.items():

Review Comment:
   How do we know `agent._action_names` and `agent._resource_names`?



##########
python/flink_agents/examples/react_agent_example.py:
##########
@@ -0,0 +1,138 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import os
+
+from pydantic import BaseModel
+
+from flink_agents.api.agents.react_agent import ReActAgent
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.api.prompts.prompt import Prompt
+from flink_agents.integrations.chat_models.ollama_chat_model import (
+    OllamaChatModelConnection,
+    OllamaChatModelSetup,
+)
+
+model = os.environ.get("OLLAMA_CHAT_MODEL", "qwen2.5:7b")
+
+
+class InputData(BaseModel):  # noqa: D101
+    a: int
+    b: int
+    c: int
+
+
+class OutputData(BaseModel):  # noqa: D101
+    result: int
+
+
+def add(a: int, b: int) -> int:
+    """Calculate the sum of a and b.
+
+    Parameters
+    ----------
+    a : int
+        The first operand
+    b : int
+        The second operand
+
+    Returns:
+    -------
+    int:
+        The sum of a and b
+    """
+    return a + b
+
+
+def multiply(a: int, b: int) -> int:
+    """Useful function to multiply two numbers.
+
+    Parameters
+    ----------
+    a : int
+        The first operand
+    b : int
+        The second operand
+
+    Returns:
+    -------
+    int:
+        The product of a and b
+    """
+    return a * b
+
+
+def output_parser(output: str) -> OutputData:
+    """Parser function to parse llm output.
+
+    Parameters
+    ----------
+    output : str
+        The llm output.
+
+    Returns:
+    -------
+    OutputData:
+        The parsed output.
+    """
+    return OutputData.model_validate_json(output)

Review Comment:
   This is just throwing the work to users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to