xintongsong commented on code in PR #81:
URL: https://github.com/apache/flink-agents/pull/81#discussion_r2244241752


##########
python/flink_agents/api/tools/utils.py:
##########
@@ -0,0 +1,171 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import typing
+from inspect import signature
+from typing import Any, Callable, Optional, Type, Union
+
+from docstring_parser import parse
+from pydantic import BaseModel, create_model
+from pydantic.fields import Field, FieldInfo
+
+
+def create_schema_from_function(name: str, func: Callable) -> Type[BaseModel]:
+    """Create a pydantic schema from a function's signature."""
+    docstr = func.__doc__
+
+    docstr = parse(docstr)
+    doc_params = {}
+    for param in docstr.params:
+        doc_params[param.arg_name] = param
+
+    fields = {}
+    params = signature(func).parameters
+    for param_name in params:
+        param_type = params[param_name].annotation
+        param_default = params[param_name].default
+        description = doc_params[param_name].description
+
+        if typing.get_origin(param_type) is typing.Annotated:
+            args = typing.get_args(param_type)
+            param_type = args[0]
+            if isinstance(args[1], str):
+                description = args[1]
+            elif isinstance(args[1], FieldInfo):
+                description = args[1].description
+
+        if param_type is params[param_name].empty:
+            param_type = typing.Any
+
+        if param_default is params[param_name].empty:
+            # Required field
+            fields[param_name] = (param_type, 
FieldInfo(description=description))
+        elif isinstance(param_default, FieldInfo):
+            # Field with pydantic.Field as default value
+            fields[param_name] = (param_type, param_default)
+        else:
+            fields[param_name] = (
+                param_type,
+                FieldInfo(default=param_default, description=description),
+            )
+
+    return create_model(name, **fields)
+
+TYPE_MAPPING: dict[str, type] = {
+    "string": str,
+    "integer": int,
+    "number": float,
+    "boolean": bool,
+    "object": dict,
+    "array": list,
+    "null": type(None),
+}
+
+CONSTRAINT_MAPPING: dict[str, str] = {
+    "minimum": "ge",
+    "maximum": "le",
+    "exclusiveMinimum": "gt",
+    "exclusiveMaximum": "lt",
+    "inclusiveMinimum": "ge",
+    "inclusiveMaximum": "le",
+    "minItems": "min_length",
+    "maxItems": "max_length",
+}
+
+
+def get_field_params_from_field_schema(field_schema: dict) -> dict:

Review Comment:
   Should this be private?



##########
python/flink_agents/api/prompt.py:
##########
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from string import Formatter
+from typing import List, Sequence
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.resource import ResourceType, SerializableResource
+
+
+class Prompt(SerializableResource):
+    """Prompt for a language model.
+
+    Attributes:
+    ----------
+    template : Sequence[ChatMessage]
+        The prompt template.
+    """
+
+    template: Sequence[ChatMessage]
+
+    @staticmethod
+    def from_messages(name: str, messages: Sequence[ChatMessage]) -> "Prompt":
+        """Create prompt from sequence of ChatMessage."""
+        return Prompt(name=name, template=messages)
+
+    @staticmethod
+    def from_text(
+        name: str, text: str, role: MessageRole = MessageRole.USER
+    ) -> "Prompt":
+        """Create prompt from text string."""
+        return Prompt(name=name, template=[ChatMessage(role=role, 
content=text)])
+
+    @classmethod
+    def resource_type(cls) -> ResourceType:
+        """Get the resource type."""
+        return ResourceType.PROMPT
+
+    def format_string(self, **kwargs: str) -> str:
+        """Generate text string from template with input arguments."""
+        msgs = []
+        for m in self.template:
+            msg = f"{m.role.value}: {Formatter().format(m.content, **kwargs)}"

Review Comment:
   This means if I create a prompt from a string, it will always add a "USER: " 
before my prompt.



##########
python/flink_agents/plan/tools/function_tool.py:
##########
@@ -15,19 +15,77 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from typing import Any, Dict, Tuple
+from typing import Any, Callable
 
-from flink_agents.api.tools.tool import BaseTool
-from flink_agents.plan.function import Function
+from docstring_parser import parse
+from pydantic import field_serializer, model_validator
+from typing_extensions import override
+
+from flink_agents.api.tools.tool import BaseTool, ToolMetadata, ToolType
+from flink_agents.api.tools.utils import create_schema_from_function
+from flink_agents.plan.function import Function, JavaFunction, PythonFunction
 
 
-#TODO: Complete FunctionTool
 class FunctionTool(BaseTool):
-    """Function tool.
+    """Tool that takes in a function.
 
-    Currently, this class is just for testing purposes.
+    Attributes:
+    ----------
+    func : Function
+        User defined function.
     """
+
     func: Function
-    def call(self, *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) -> Any:
-        """Call function."""
+
+    @field_serializer("func")
+    def __serialize_func(self, func: Function) -> dict:
+        # append meta info to help deserialize exec
+        data = func.model_dump()
+        data["func_type"] = func.__class__.__qualname__
+        return data

Review Comment:
   Why do we need this here? `Function` should be self contained.



##########
python/flink_agents/plan/action.py:
##########
@@ -41,40 +49,109 @@ class Action(BaseModel):
     """
 
     name: str
-    #TODO: Raise a warning when the action has a return value, as it will be 
ignored.
+    # TODO: Raise a warning when the action has a return value, as it will be 
ignored.
     exec: Function
     listen_event_types: List[str]
 
-    @field_serializer('exec')
+    @field_serializer("exec")
     def __serialize_exec(self, exec: Function) -> dict:
         # append meta info to help deserialize exec
         data = exec.model_dump()
-        data['func_type'] = exec.__class__.__qualname__
+        data["func_type"] = exec.__class__.__qualname__
         return data
 
-    @model_validator(mode='before')
-    def __custom_deserialize(self) -> 'Action':
-        exec = self['exec']
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "Action":
+        exec = self["exec"]
         # restore exec from serialized json.
         if isinstance(exec, dict):
-            func_type = exec['func_type']
-            if func_type == 'PythonFunction':
-                self['exec'] = PythonFunction(**exec)
-            elif func_type == 'JavaFunction':
-                self['exec'] = JavaFunction(**exec)
+            func_type = exec["func_type"]
+            if func_type == "PythonFunction":
+                self["exec"] = PythonFunction(**exec)
+            elif func_type == "JavaFunction":
+                self["exec"] = JavaFunction(**exec)
             else:
-                err_msg = f'Unknown function type: {func_type}'
+                err_msg = f"Unknown function type: {func_type}"
                 raise NotImplementedError(err_msg)
         return self
 
     def __init__(
-            self,
-            name: str,
-            exec: Function,
-            listen_event_types: List[str],
+        self,
+        name: str,
+        exec: Function,
+        listen_event_types: List[str],
     ) -> None:
         """Action will check function signature when init."""
         super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
-        #TODO: Update expected signature after import State and Context.
+        # TODO: Update expected signature after import State and Context.
         self.exec.check_signature(Event, RunnerContext)
 
+
+def process_chat_request(event: ChatRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a chat request."""
+    chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
+    response = chat_model.chat(event.messages)
+    # call tool
+    if len(response.tool_calls) > 0:
+        for tool_call in response.tool_calls:
+            # store the tool call context in short term memory
+            state = ctx.get_short_term_memory()
+            if not state.is_exist("__tool_context"):
+                state.set("__tool_context", {})
+            tool_context = state.get("__tool_context")
+            tool_call_id = tool_call["id"]
+            tool_context[tool_call_id] = event
+            tool_context[tool_call_id].messages.append(response)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=tool_call_id,
+                    tool=tool_call["function"]["name"],
+                    kwargs=tool_call["function"]["arguments"],
+                )
+            )
+
+    # send response
+    else:
+        ctx.send_event(ChatResponseEvent(request=event, response=response))
+
+
+def process_tool_request(event: ToolRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a tool call request."""
+    tool = ctx.get_resource(event.tool, ResourceType.TOOL)
+    response = tool.call(**event.kwargs)
+    ctx.send_event(ToolResponseEvent(request=event, response=response))
+
+
+def process_tool_response(event: ToolResponseEvent, ctx: RunnerContext) -> 
None:

Review Comment:
   We can process tool response in chat model action



##########
python/flink_agents/plan/agent_plan.py:
##########
@@ -124,6 +124,15 @@ def from_agent(agent: Agent) -> "AgentPlan":
                     actions_by_event[event_type] = []
                 actions_by_event[event_type].append(action.name)
 
+        # append built-in actions
+        for action in BUILT_IN_ACTIONS:

Review Comment:
   Is the handling of built-in actions different from user defined actions? 
Shall we just put them in the same list before handling?



##########
python/flink_agents/api/prompt.py:
##########
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from string import Formatter
+from typing import List, Sequence
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.resource import ResourceType, SerializableResource
+
+
+class Prompt(SerializableResource):
+    """Prompt for a language model.
+
+    Attributes:
+    ----------
+    template : Sequence[ChatMessage]
+        The prompt template.
+    """
+
+    template: Sequence[ChatMessage]
+
+    @staticmethod
+    def from_messages(name: str, messages: Sequence[ChatMessage]) -> "Prompt":
+        """Create prompt from sequence of ChatMessage."""
+        return Prompt(name=name, template=messages)
+
+    @staticmethod
+    def from_text(
+        name: str, text: str, role: MessageRole = MessageRole.USER
+    ) -> "Prompt":
+        """Create prompt from text string."""
+        return Prompt(name=name, template=[ChatMessage(role=role, 
content=text)])
+
+    @classmethod
+    def resource_type(cls) -> ResourceType:
+        """Get the resource type."""
+        return ResourceType.PROMPT
+
+    def format_string(self, **kwargs: str) -> str:
+        """Generate text string from template with input arguments."""
+        msgs = []
+        for m in self.template:
+            msg = f"{m.role.value}: {Formatter().format(m.content, **kwargs)}"

Review Comment:
   Should not always create a new formatter.



##########
python/flink_agents/plan/action.py:
##########
@@ -41,40 +49,109 @@ class Action(BaseModel):
     """
 
     name: str
-    #TODO: Raise a warning when the action has a return value, as it will be 
ignored.
+    # TODO: Raise a warning when the action has a return value, as it will be 
ignored.
     exec: Function
     listen_event_types: List[str]
 
-    @field_serializer('exec')
+    @field_serializer("exec")
     def __serialize_exec(self, exec: Function) -> dict:
         # append meta info to help deserialize exec
         data = exec.model_dump()
-        data['func_type'] = exec.__class__.__qualname__
+        data["func_type"] = exec.__class__.__qualname__
         return data
 
-    @model_validator(mode='before')
-    def __custom_deserialize(self) -> 'Action':
-        exec = self['exec']
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "Action":
+        exec = self["exec"]
         # restore exec from serialized json.
         if isinstance(exec, dict):
-            func_type = exec['func_type']
-            if func_type == 'PythonFunction':
-                self['exec'] = PythonFunction(**exec)
-            elif func_type == 'JavaFunction':
-                self['exec'] = JavaFunction(**exec)
+            func_type = exec["func_type"]
+            if func_type == "PythonFunction":
+                self["exec"] = PythonFunction(**exec)
+            elif func_type == "JavaFunction":
+                self["exec"] = JavaFunction(**exec)
             else:
-                err_msg = f'Unknown function type: {func_type}'
+                err_msg = f"Unknown function type: {func_type}"
                 raise NotImplementedError(err_msg)
         return self
 
     def __init__(
-            self,
-            name: str,
-            exec: Function,
-            listen_event_types: List[str],
+        self,
+        name: str,
+        exec: Function,
+        listen_event_types: List[str],
     ) -> None:
         """Action will check function signature when init."""
         super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
-        #TODO: Update expected signature after import State and Context.
+        # TODO: Update expected signature after import State and Context.
         self.exec.check_signature(Event, RunnerContext)
 
+
+def process_chat_request(event: ChatRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a chat request."""
+    chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
+    response = chat_model.chat(event.messages)
+    # call tool
+    if len(response.tool_calls) > 0:
+        for tool_call in response.tool_calls:
+            # store the tool call context in short term memory
+            state = ctx.get_short_term_memory()
+            if not state.is_exist("__tool_context"):
+                state.set("__tool_context", {})
+            tool_context = state.get("__tool_context")
+            tool_call_id = tool_call["id"]
+            tool_context[tool_call_id] = event
+            tool_context[tool_call_id].messages.append(response)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=tool_call_id,
+                    tool=tool_call["function"]["name"],
+                    kwargs=tool_call["function"]["arguments"],
+                )
+            )
+
+    # send response
+    else:
+        ctx.send_event(ChatResponseEvent(request=event, response=response))
+
+
+def process_tool_request(event: ToolRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a tool call request."""
+    tool = ctx.get_resource(event.tool, ResourceType.TOOL)
+    response = tool.call(**event.kwargs)
+    ctx.send_event(ToolResponseEvent(request=event, response=response))
+
+
+def process_tool_response(event: ToolResponseEvent, ctx: RunnerContext) -> 
None:
+    """Built-in action for processing a tool call response."""
+    state = ctx.get_short_term_memory()
+
+    if state.is_exist("__tool_context"):
+        tool_context = state.get("__tool_context")
+        tool_call_id = event.request.id
+        if tool_call_id in tool_context:
+            # get the tool call context from short term memory
+            tool_context = tool_context[tool_call_id]
+            tool_context.messages.append(
+                ChatMessage(role=MessageRole.TOOL, content=str(event.response))
+            )
+            ctx.send_event(tool_context)
+
+
+PROCESS_CHAT_REQUEST = Action(
+    name="process_chat_request",
+    exec=PythonFunction.from_callable(process_chat_request),
+    
listen_event_types=[f"{ChatRequestEvent.__module__}.{ChatRequestEvent.__name__}"],
+)
+PROCESS_TOOL_REQUEST = Action(

Review Comment:
   ```suggestion
   TOOL_USE_ACTION = Action(
   ```



##########
python/flink_agents/plan/action.py:
##########
@@ -41,40 +49,109 @@ class Action(BaseModel):
     """
 
     name: str
-    #TODO: Raise a warning when the action has a return value, as it will be 
ignored.
+    # TODO: Raise a warning when the action has a return value, as it will be 
ignored.
     exec: Function
     listen_event_types: List[str]
 
-    @field_serializer('exec')
+    @field_serializer("exec")
     def __serialize_exec(self, exec: Function) -> dict:
         # append meta info to help deserialize exec
         data = exec.model_dump()
-        data['func_type'] = exec.__class__.__qualname__
+        data["func_type"] = exec.__class__.__qualname__
         return data
 
-    @model_validator(mode='before')
-    def __custom_deserialize(self) -> 'Action':
-        exec = self['exec']
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "Action":
+        exec = self["exec"]
         # restore exec from serialized json.
         if isinstance(exec, dict):
-            func_type = exec['func_type']
-            if func_type == 'PythonFunction':
-                self['exec'] = PythonFunction(**exec)
-            elif func_type == 'JavaFunction':
-                self['exec'] = JavaFunction(**exec)
+            func_type = exec["func_type"]
+            if func_type == "PythonFunction":
+                self["exec"] = PythonFunction(**exec)
+            elif func_type == "JavaFunction":
+                self["exec"] = JavaFunction(**exec)
             else:
-                err_msg = f'Unknown function type: {func_type}'
+                err_msg = f"Unknown function type: {func_type}"
                 raise NotImplementedError(err_msg)
         return self
 
     def __init__(
-            self,
-            name: str,
-            exec: Function,
-            listen_event_types: List[str],
+        self,
+        name: str,
+        exec: Function,
+        listen_event_types: List[str],
     ) -> None:
         """Action will check function signature when init."""
         super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
-        #TODO: Update expected signature after import State and Context.
+        # TODO: Update expected signature after import State and Context.
         self.exec.check_signature(Event, RunnerContext)
 
+
+def process_chat_request(event: ChatRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a chat request."""
+    chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
+    response = chat_model.chat(event.messages)

Review Comment:
   Model and tool should be called asynchronously. Let's add a todo here and 
complete it once the async execution is completed.



##########
python/flink_agents/api/prompt.py:
##########
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from string import Formatter
+from typing import List, Sequence
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.resource import ResourceType, SerializableResource
+
+
+class Prompt(SerializableResource):
+    """Prompt for a language model.
+
+    Attributes:
+    ----------
+    template : Sequence[ChatMessage]
+        The prompt template.
+    """
+
+    template: Sequence[ChatMessage]
+
+    @staticmethod
+    def from_messages(name: str, messages: Sequence[ChatMessage]) -> "Prompt":
+        """Create prompt from sequence of ChatMessage."""
+        return Prompt(name=name, template=messages)
+
+    @staticmethod
+    def from_text(
+        name: str, text: str, role: MessageRole = MessageRole.USER

Review Comment:
   Why is the default role `USER`?



##########
python/flink_agents/plan/action.py:
##########
@@ -41,40 +49,109 @@ class Action(BaseModel):
     """
 
     name: str
-    #TODO: Raise a warning when the action has a return value, as it will be 
ignored.
+    # TODO: Raise a warning when the action has a return value, as it will be 
ignored.
     exec: Function
     listen_event_types: List[str]
 
-    @field_serializer('exec')
+    @field_serializer("exec")
     def __serialize_exec(self, exec: Function) -> dict:
         # append meta info to help deserialize exec
         data = exec.model_dump()
-        data['func_type'] = exec.__class__.__qualname__
+        data["func_type"] = exec.__class__.__qualname__
         return data
 
-    @model_validator(mode='before')
-    def __custom_deserialize(self) -> 'Action':
-        exec = self['exec']
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "Action":
+        exec = self["exec"]
         # restore exec from serialized json.
         if isinstance(exec, dict):
-            func_type = exec['func_type']
-            if func_type == 'PythonFunction':
-                self['exec'] = PythonFunction(**exec)
-            elif func_type == 'JavaFunction':
-                self['exec'] = JavaFunction(**exec)
+            func_type = exec["func_type"]
+            if func_type == "PythonFunction":
+                self["exec"] = PythonFunction(**exec)
+            elif func_type == "JavaFunction":
+                self["exec"] = JavaFunction(**exec)
             else:
-                err_msg = f'Unknown function type: {func_type}'
+                err_msg = f"Unknown function type: {func_type}"
                 raise NotImplementedError(err_msg)
         return self
 
     def __init__(
-            self,
-            name: str,
-            exec: Function,
-            listen_event_types: List[str],
+        self,
+        name: str,
+        exec: Function,
+        listen_event_types: List[str],
     ) -> None:
         """Action will check function signature when init."""
         super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
-        #TODO: Update expected signature after import State and Context.
+        # TODO: Update expected signature after import State and Context.
         self.exec.check_signature(Event, RunnerContext)
 
+
+def process_chat_request(event: ChatRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a chat request."""
+    chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
+    response = chat_model.chat(event.messages)
+    # call tool
+    if len(response.tool_calls) > 0:
+        for tool_call in response.tool_calls:
+            # store the tool call context in short term memory
+            state = ctx.get_short_term_memory()
+            if not state.is_exist("__tool_context"):
+                state.set("__tool_context", {})
+            tool_context = state.get("__tool_context")
+            tool_call_id = tool_call["id"]
+            tool_context[tool_call_id] = event
+            tool_context[tool_call_id].messages.append(response)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=tool_call_id,
+                    tool=tool_call["function"]["name"],
+                    kwargs=tool_call["function"]["arguments"],
+                )
+            )
+
+    # send response
+    else:
+        ctx.send_event(ChatResponseEvent(request=event, response=response))
+
+
+def process_tool_request(event: ToolRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a tool call request."""
+    tool = ctx.get_resource(event.tool, ResourceType.TOOL)
+    response = tool.call(**event.kwargs)
+    ctx.send_event(ToolResponseEvent(request=event, response=response))
+
+
+def process_tool_response(event: ToolResponseEvent, ctx: RunnerContext) -> 
None:
+    """Built-in action for processing a tool call response."""
+    state = ctx.get_short_term_memory()
+
+    if state.is_exist("__tool_context"):
+        tool_context = state.get("__tool_context")
+        tool_call_id = event.request.id
+        if tool_call_id in tool_context:
+            # get the tool call context from short term memory
+            tool_context = tool_context[tool_call_id]
+            tool_context.messages.append(
+                ChatMessage(role=MessageRole.TOOL, content=str(event.response))
+            )
+            ctx.send_event(tool_context)
+
+
+PROCESS_CHAT_REQUEST = Action(

Review Comment:
   ```suggestion
   CHAT_MODEL_ACTION = Action(
   ```



##########
python/flink_agents/plan/action.py:
##########
@@ -41,40 +49,109 @@ class Action(BaseModel):
     """
 
     name: str
-    #TODO: Raise a warning when the action has a return value, as it will be 
ignored.
+    # TODO: Raise a warning when the action has a return value, as it will be 
ignored.
     exec: Function
     listen_event_types: List[str]
 
-    @field_serializer('exec')
+    @field_serializer("exec")
     def __serialize_exec(self, exec: Function) -> dict:
         # append meta info to help deserialize exec
         data = exec.model_dump()
-        data['func_type'] = exec.__class__.__qualname__
+        data["func_type"] = exec.__class__.__qualname__
         return data
 
-    @model_validator(mode='before')
-    def __custom_deserialize(self) -> 'Action':
-        exec = self['exec']
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "Action":
+        exec = self["exec"]
         # restore exec from serialized json.
         if isinstance(exec, dict):
-            func_type = exec['func_type']
-            if func_type == 'PythonFunction':
-                self['exec'] = PythonFunction(**exec)
-            elif func_type == 'JavaFunction':
-                self['exec'] = JavaFunction(**exec)
+            func_type = exec["func_type"]
+            if func_type == "PythonFunction":
+                self["exec"] = PythonFunction(**exec)
+            elif func_type == "JavaFunction":
+                self["exec"] = JavaFunction(**exec)
             else:
-                err_msg = f'Unknown function type: {func_type}'
+                err_msg = f"Unknown function type: {func_type}"
                 raise NotImplementedError(err_msg)
         return self
 
     def __init__(
-            self,
-            name: str,
-            exec: Function,
-            listen_event_types: List[str],
+        self,
+        name: str,
+        exec: Function,
+        listen_event_types: List[str],
     ) -> None:
         """Action will check function signature when init."""
         super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
-        #TODO: Update expected signature after import State and Context.
+        # TODO: Update expected signature after import State and Context.
         self.exec.check_signature(Event, RunnerContext)
 
+
+def process_chat_request(event: ChatRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a chat request."""
+    chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
+    response = chat_model.chat(event.messages)
+    # call tool
+    if len(response.tool_calls) > 0:
+        for tool_call in response.tool_calls:
+            # store the tool call context in short term memory
+            state = ctx.get_short_term_memory()
+            if not state.is_exist("__tool_context"):
+                state.set("__tool_context", {})
+            tool_context = state.get("__tool_context")
+            tool_call_id = tool_call["id"]
+            tool_context[tool_call_id] = event
+            tool_context[tool_call_id].messages.append(response)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=tool_call_id,
+                    tool=tool_call["function"]["name"],
+                    kwargs=tool_call["function"]["arguments"],
+                )
+            )
+
+    # send response
+    else:
+        ctx.send_event(ChatResponseEvent(request=event, response=response))
+
+
+def process_tool_request(event: ToolRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a tool call request."""
+    tool = ctx.get_resource(event.tool, ResourceType.TOOL)
+    response = tool.call(**event.kwargs)
+    ctx.send_event(ToolResponseEvent(request=event, response=response))
+
+
+def process_tool_response(event: ToolResponseEvent, ctx: RunnerContext) -> 
None:
+    """Built-in action for processing a tool call response."""
+    state = ctx.get_short_term_memory()
+
+    if state.is_exist("__tool_context"):
+        tool_context = state.get("__tool_context")
+        tool_call_id = event.request.id
+        if tool_call_id in tool_context:
+            # get the tool call context from short term memory
+            tool_context = tool_context[tool_call_id]

Review Comment:
   The name here is confusing.



##########
python/flink_agents/plan/action.py:
##########
@@ -41,40 +49,109 @@ class Action(BaseModel):
     """
 
     name: str
-    #TODO: Raise a warning when the action has a return value, as it will be 
ignored.
+    # TODO: Raise a warning when the action has a return value, as it will be 
ignored.
     exec: Function
     listen_event_types: List[str]
 
-    @field_serializer('exec')
+    @field_serializer("exec")
     def __serialize_exec(self, exec: Function) -> dict:
         # append meta info to help deserialize exec
         data = exec.model_dump()
-        data['func_type'] = exec.__class__.__qualname__
+        data["func_type"] = exec.__class__.__qualname__
         return data
 
-    @model_validator(mode='before')
-    def __custom_deserialize(self) -> 'Action':
-        exec = self['exec']
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "Action":
+        exec = self["exec"]
         # restore exec from serialized json.
         if isinstance(exec, dict):
-            func_type = exec['func_type']
-            if func_type == 'PythonFunction':
-                self['exec'] = PythonFunction(**exec)
-            elif func_type == 'JavaFunction':
-                self['exec'] = JavaFunction(**exec)
+            func_type = exec["func_type"]
+            if func_type == "PythonFunction":
+                self["exec"] = PythonFunction(**exec)
+            elif func_type == "JavaFunction":
+                self["exec"] = JavaFunction(**exec)
             else:
-                err_msg = f'Unknown function type: {func_type}'
+                err_msg = f"Unknown function type: {func_type}"
                 raise NotImplementedError(err_msg)
         return self
 
     def __init__(
-            self,
-            name: str,
-            exec: Function,
-            listen_event_types: List[str],
+        self,
+        name: str,
+        exec: Function,
+        listen_event_types: List[str],
     ) -> None:
         """Action will check function signature when init."""
         super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
-        #TODO: Update expected signature after import State and Context.
+        # TODO: Update expected signature after import State and Context.
         self.exec.check_signature(Event, RunnerContext)
 
+
+def process_chat_request(event: ChatRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a chat request."""
+    chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
+    response = chat_model.chat(event.messages)
+    # call tool
+    if len(response.tool_calls) > 0:
+        for tool_call in response.tool_calls:
+            # store the tool call context in short term memory
+            state = ctx.get_short_term_memory()
+            if not state.is_exist("__tool_context"):
+                state.set("__tool_context", {})
+            tool_context = state.get("__tool_context")
+            tool_call_id = tool_call["id"]
+            tool_context[tool_call_id] = event
+            tool_context[tool_call_id].messages.append(response)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=tool_call_id,
+                    tool=tool_call["function"]["name"],
+                    kwargs=tool_call["function"]["arguments"],
+                )
+            )
+
+    # send response
+    else:
+        ctx.send_event(ChatResponseEvent(request=event, response=response))
+
+
+def process_tool_request(event: ToolRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a tool call request."""
+    tool = ctx.get_resource(event.tool, ResourceType.TOOL)
+    response = tool.call(**event.kwargs)
+    ctx.send_event(ToolResponseEvent(request=event, response=response))
+
+
+def process_tool_response(event: ToolResponseEvent, ctx: RunnerContext) -> 
None:
+    """Built-in action for processing a tool call response."""
+    state = ctx.get_short_term_memory()
+
+    if state.is_exist("__tool_context"):
+        tool_context = state.get("__tool_context")
+        tool_call_id = event.request.id
+        if tool_call_id in tool_context:
+            # get the tool call context from short term memory
+            tool_context = tool_context[tool_call_id]
+            tool_context.messages.append(
+                ChatMessage(role=MessageRole.TOOL, content=str(event.response))
+            )
+            ctx.send_event(tool_context)
+
+
+PROCESS_CHAT_REQUEST = Action(
+    name="process_chat_request",
+    exec=PythonFunction.from_callable(process_chat_request),
+    
listen_event_types=[f"{ChatRequestEvent.__module__}.{ChatRequestEvent.__name__}"],
+)
+PROCESS_TOOL_REQUEST = Action(
+    name="process_tool_request",
+    exec=PythonFunction.from_callable(process_tool_request),
+    
listen_event_types=[f"{ToolRequestEvent.__module__}.{ToolRequestEvent.__name__}"],
+)
+PROCESS_TOOL_RESPONSE = Action(
+    name="process_tool_response",
+    exec=PythonFunction.from_callable(process_tool_response),
+    
listen_event_types=[f"{ToolResponseEvent.__module__}.{ToolResponseEvent.__name__}"],
+)
+
+BUILT_IN_ACTIONS = [PROCESS_CHAT_REQUEST, PROCESS_TOOL_REQUEST, 
PROCESS_TOOL_RESPONSE]

Review Comment:
   I'd suggest to organize builtin actions and events by functionality. E.g., 
put ChatRequestEvent, ChatResponseEvent and ChatAction in one file.



##########
python/flink_agents/plan/agent_plan.py:
##########
@@ -169,7 +178,23 @@ def get_resource(self, name: str, type: ResourceType) -> 
Resource:
             self.__resources[type] = {}
         if name not in self.__resources[type]:
             resource_provider = self.resource_providers[type][name]
-            self.__resources[type][name] = resource_provider.provide()
+            resource = resource_provider.provide()
+            if resource.resource_type() == ResourceType.CHAT_MODEL:
+                # bind tools
+                if resource.tools is not None:
+                    resource.bind_tools(
+                        [
+                            self.get_resource(name, ResourceType.TOOL)
+                            for name in resource.tools
+                        ]
+                    )
+                # bind prompt
+                if resource.prompt is not None and isinstance(resource.prompt, 
str):
+                    resource.prompt = self.get_resource(
+                        resource.prompt, ResourceType.PROMPT
+                    )
+
+            self.__resources[type][name] = resource

Review Comment:
   This should be done inside the provider.



##########
python/flink_agents/plan/action.py:
##########
@@ -41,40 +49,109 @@ class Action(BaseModel):
     """
 
     name: str
-    #TODO: Raise a warning when the action has a return value, as it will be 
ignored.
+    # TODO: Raise a warning when the action has a return value, as it will be 
ignored.
     exec: Function
     listen_event_types: List[str]
 
-    @field_serializer('exec')
+    @field_serializer("exec")
     def __serialize_exec(self, exec: Function) -> dict:
         # append meta info to help deserialize exec
         data = exec.model_dump()
-        data['func_type'] = exec.__class__.__qualname__
+        data["func_type"] = exec.__class__.__qualname__
         return data
 
-    @model_validator(mode='before')
-    def __custom_deserialize(self) -> 'Action':
-        exec = self['exec']
+    @model_validator(mode="before")
+    def __custom_deserialize(self) -> "Action":
+        exec = self["exec"]
         # restore exec from serialized json.
         if isinstance(exec, dict):
-            func_type = exec['func_type']
-            if func_type == 'PythonFunction':
-                self['exec'] = PythonFunction(**exec)
-            elif func_type == 'JavaFunction':
-                self['exec'] = JavaFunction(**exec)
+            func_type = exec["func_type"]
+            if func_type == "PythonFunction":
+                self["exec"] = PythonFunction(**exec)
+            elif func_type == "JavaFunction":
+                self["exec"] = JavaFunction(**exec)
             else:
-                err_msg = f'Unknown function type: {func_type}'
+                err_msg = f"Unknown function type: {func_type}"
                 raise NotImplementedError(err_msg)
         return self
 
     def __init__(
-            self,
-            name: str,
-            exec: Function,
-            listen_event_types: List[str],
+        self,
+        name: str,
+        exec: Function,
+        listen_event_types: List[str],
     ) -> None:
         """Action will check function signature when init."""
         super().__init__(name=name, exec=exec, 
listen_event_types=listen_event_types)
-        #TODO: Update expected signature after import State and Context.
+        # TODO: Update expected signature after import State and Context.
         self.exec.check_signature(Event, RunnerContext)
 
+
+def process_chat_request(event: ChatRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a chat request."""
+    chat_model = ctx.get_resource(event.model, ResourceType.CHAT_MODEL)
+    response = chat_model.chat(event.messages)
+    # call tool
+    if len(response.tool_calls) > 0:
+        for tool_call in response.tool_calls:
+            # store the tool call context in short term memory
+            state = ctx.get_short_term_memory()
+            if not state.is_exist("__tool_context"):
+                state.set("__tool_context", {})
+            tool_context = state.get("__tool_context")
+            tool_call_id = tool_call["id"]
+            tool_context[tool_call_id] = event
+            tool_context[tool_call_id].messages.append(response)
+            ctx.send_event(
+                ToolRequestEvent(
+                    id=tool_call_id,
+                    tool=tool_call["function"]["name"],
+                    kwargs=tool_call["function"]["arguments"],
+                )
+            )
+
+    # send response
+    else:
+        ctx.send_event(ChatResponseEvent(request=event, response=response))
+
+
+def process_tool_request(event: ToolRequestEvent, ctx: RunnerContext) -> None:
+    """Built-in action for processing a tool call request."""
+    tool = ctx.get_resource(event.tool, ResourceType.TOOL)
+    response = tool.call(**event.kwargs)
+    ctx.send_event(ToolResponseEvent(request=event, response=response))
+
+
+def process_tool_response(event: ToolResponseEvent, ctx: RunnerContext) -> 
None:
+    """Built-in action for processing a tool call response."""
+    state = ctx.get_short_term_memory()
+
+    if state.is_exist("__tool_context"):
+        tool_context = state.get("__tool_context")
+        tool_call_id = event.request.id
+        if tool_call_id in tool_context:
+            # get the tool call context from short term memory
+            tool_context = tool_context[tool_call_id]

Review Comment:
   Shall we also remove the context?



##########
python/flink_agents/api/prompt.py:
##########
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from string import Formatter
+from typing import List, Sequence
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.resource import ResourceType, SerializableResource
+
+
+class Prompt(SerializableResource):
+    """Prompt for a language model.
+
+    Attributes:
+    ----------
+    template : Sequence[ChatMessage]
+        The prompt template.
+    """
+
+    template: Sequence[ChatMessage]
+
+    @staticmethod
+    def from_messages(name: str, messages: Sequence[ChatMessage]) -> "Prompt":
+        """Create prompt from sequence of ChatMessage."""
+        return Prompt(name=name, template=messages)
+
+    @staticmethod
+    def from_text(
+        name: str, text: str, role: MessageRole = MessageRole.USER
+    ) -> "Prompt":
+        """Create prompt from text string."""
+        return Prompt(name=name, template=[ChatMessage(role=role, 
content=text)])
+
+    @classmethod
+    def resource_type(cls) -> ResourceType:
+        """Get the resource type."""
+        return ResourceType.PROMPT
+
+    def format_string(self, **kwargs: str) -> str:
+        """Generate text string from template with input arguments."""
+        msgs = []
+        for m in self.template:
+            msg = f"{m.role.value}: {Formatter().format(m.content, **kwargs)}"

Review Comment:
   What happens if an argument is not provided?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to