This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new fd6ad26  [API][Python] Support DataStream / Table  as input / output 
of Flink Agents in Python (#48)
fd6ad26 is described below

commit fd6ad26ce329bfe70d6a3a4a02ae9346fb6dbe48
Author: Wenjin Xie <166717626+wenjin...@users.noreply.github.com>
AuthorDate: Fri Jul 11 16:49:59 2025 +0800

    [API][Python] Support DataStream / Table  as input / output of Flink Agents 
in Python (#48)
---
 .licenserc.yaml                                    |   1 +
 python/flink_agents/api/event.py                   |   5 +
 python/flink_agents/api/execution_enviroment.py    |  80 --------
 python/flink_agents/api/execution_environment.py   | 172 ++++++++++++++++
 python/flink_agents/api/tests/test_event.py        |   4 +
 python/flink_agents/examples/agent_example.py      |  27 +--
 .../integrate_datastream_with_agent_example.py     |  89 ++++++++
 .../examples/integrate_table_with_agent_example.py | 119 +++++++++++
 python/flink_agents/examples/my_agent.py           |  98 +++++++++
 .../flink_agents/examples/resources/input_data.txt |  10 +
 python/flink_agents/plan/agent_plan.py             |   6 +-
 python/flink_agents/plan/tests/test_agent_plan.py  |   4 +-
 .../runtime/local_execution_environment.py         | 110 ++++++++--
 python/flink_agents/runtime/local_runner.py        |   2 +-
 python/flink_agents/runtime/python_java_utils.py   |  32 +--
 .../runtime/remote_execution_environment.py        | 223 +++++++++++++++++++++
 .../tests/test_local_execution_environment.py      |  13 +-
 python/requirements/test_requirements.txt          |   1 +
 18 files changed, 863 insertions(+), 133 deletions(-)

diff --git a/.licenserc.yaml b/.licenserc.yaml
index 673b1ac..0e0e64e 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -26,6 +26,7 @@ header:
     - 'NOTICE'
     - '.gitignore'
     - '**/*.json'
+    - '**/*.txt'
   comment: on-failure
   dependency:
     files:
diff --git a/python/flink_agents/api/event.py b/python/flink_agents/api/event.py
index 01084f5..4f85648 100644
--- a/python/flink_agents/api/event.py
+++ b/python/flink_agents/api/event.py
@@ -20,6 +20,7 @@ from typing import Any
 from uuid import UUID, uuid4
 
 from pydantic import BaseModel, Field, model_validator
+from pyflink.common import Row
 
 
 class Event(BaseModel, ABC, extra="allow"):
@@ -36,6 +37,10 @@ class Event(BaseModel, ABC, extra="allow"):
     @model_validator(mode='after')
     def validate_extra(self) -> 'Event':
         """Ensure init fields is serializable."""
+        #TODO: support Event contains Row field be json serializable
+        for value in self.model_dump().values():
+            if isinstance(value, Row):
+                return self
         self.model_dump_json()
         return self
 
diff --git a/python/flink_agents/api/execution_enviroment.py 
b/python/flink_agents/api/execution_enviroment.py
deleted file mode 100644
index ebabc05..0000000
--- a/python/flink_agents/api/execution_enviroment.py
+++ /dev/null
@@ -1,80 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-#################################################################################
-import importlib
-from abc import ABC, abstractmethod
-from typing import Any, Dict, List
-
-from flink_agents.api.agent import Agent
-
-
-class AgentsExecutionEnvironment(ABC):
-    """Base class for agent execution environment."""
-
-    @staticmethod
-    def get_execution_environment(**kwargs: Dict[str, Any]) -> 
'AgentsExecutionEnvironment':
-        """Get agents execution environment.
-
-        Currently, this method only returns LocalExecutionEnvironment. After
-        implement other AgentsExecutionEnvironments, this method will return
-        appropriate environment according to configuration.
-
-        Returns:
-        -------
-        AgentsExecutionEnvironment
-            Environment for agent execution.
-        """
-        return importlib.import_module(
-            "flink_agents.runtime.local_execution_environment"
-        ).get_execution_environment(**kwargs)
-
-    @abstractmethod
-    def from_list(self, input: List[Dict[str, Any]]) -> 
'AgentsExecutionEnvironment':
-        """Set input for agents. Used for local execution.
-
-        Parameters
-        ----------
-        input : list
-            Receive a list as input. The element in the list should be a dict 
like
-            {'key': Any, 'value': Any} or {'value': Any} , extra field will be 
ignored.
-        """
-
-    @abstractmethod
-    def apply(self, agent: Agent) -> 'AgentsExecutionEnvironment':
-        """Set agent of execution environment.
-
-        Parameters
-        ----------
-        agent : Agent
-            The agent user defined to run in execution environment.
-        """
-
-    @abstractmethod
-    def to_list(self) -> List[Dict[str, Any]]:
-        """Get outputs of agent execution. Used for local execution.
-
-        The element in the list is a dict like {'key': output}.
-
-        Returns:
-        -------
-        list
-            Outputs of agent execution.
-        """
-
-    @abstractmethod
-    def execute(self) -> None:
-        """Execute agents."""
diff --git a/python/flink_agents/api/execution_environment.py 
b/python/flink_agents/api/execution_environment.py
new file mode 100644
index 0000000..e6d9efd
--- /dev/null
+++ b/python/flink_agents/api/execution_environment.py
@@ -0,0 +1,172 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import importlib
+from abc import ABC, abstractmethod
+from typing import Any, Dict, List, Optional
+
+from pyflink.common import TypeInformation
+from pyflink.datastream import DataStream, KeySelector, 
StreamExecutionEnvironment
+from pyflink.table import Schema, StreamTableEnvironment, Table
+
+from flink_agents.api.agent import Agent
+
+
+class AgentBuilder(ABC):
+    """Builder for integrating agent with input and output."""
+
+    @abstractmethod
+    def apply(self, agent: Agent) -> "AgentBuilder":
+        """Set agent of AgentBuilder.
+
+        Parameters
+        ----------
+        agent : Agent
+            The agent user defined to run in execution environment.
+        """
+
+    @abstractmethod
+    def to_list(self) -> List[Dict[str, Any]]:
+        """Get output list of agent execution.
+
+        The element in the list is a dict like {'key': output}.
+
+        Returns:
+        -------
+        list
+            Outputs of agent execution.
+        """
+
+    @abstractmethod
+    def to_datastream(self) -> DataStream:
+        """Get output datastream of agent execution.
+
+        Returns:
+        -------
+        DataStream
+            Output datastream of agent execution.
+        """
+
+    # TODO: auto generate output_type.
+    @abstractmethod
+    def to_table(self, schema: Schema, output_type: TypeInformation) -> Table:
+        """Get output table of agent execution.
+
+        Parameters
+        ----------
+        schema : Schema
+            Indicate schema of the output table.
+        output_type : TypeInformation
+            Indicate schema corresponding type information.
+
+        Returns:
+        -------
+        Table
+            Output table of agent execution.
+        """
+
+
+class AgentsExecutionEnvironment(ABC):
+    """Base class for agent execution environment."""
+
+    @staticmethod
+    def get_execution_environment(
+        env: Optional[StreamExecutionEnvironment] = None, **kwargs: Dict[str, 
Any]
+    ) -> "AgentsExecutionEnvironment":
+        """Get agents execution environment.
+
+        Currently, user can run flink agents in ide using 
LocalExecutionEnvironment or
+        RemoteExecutionEnvironment. To distinguish which environment to use, 
when run
+        flink agents with pyflink datastream/table, user should pass flink
+        StreamExecutionEnvironment when get AgentsExecutionEnvironment.
+
+        Returns:
+        -------
+        AgentsExecutionEnvironment
+            Environment for agent execution.
+        """
+        if env is None:
+            return importlib.import_module(
+                "flink_agents.runtime.local_execution_environment"
+            ).create_instance(env=env, **kwargs)
+        else:
+            return importlib.import_module(
+                "flink_agents.runtime.remote_execution_environment"
+            ).create_instance(env=env, **kwargs)
+
+    @abstractmethod
+    def from_list(self, input: List[Dict[str, Any]]) -> AgentBuilder:
+        """Set input for agents. Used for local execution.
+
+        Parameters
+        ----------
+        input : list
+            Receive a list as input. The element in the list should be a dict 
like
+            {'key': Any, 'value': Any} or {'value': Any} , extra field will be 
ignored.
+
+        Returns:
+        -------
+        AgentBuilder
+            A new builder to build an agent for specific input.
+        """
+
+    @abstractmethod
+    def from_datastream(
+        self, input: DataStream, key_selector: Optional[KeySelector] = None
+    ) -> AgentBuilder:
+        """Set input for agents. Used for remote execution.
+
+        Parameters
+        ----------
+        input : DataStream
+            Receive a DataStream as input.
+        key_selector : KeySelector
+            Extract key from each input record.
+
+        Returns:
+        -------
+        AgentBuilder
+            A new builder to build an agent for specific input.
+        """
+
+    @abstractmethod
+    def from_table(
+        self,
+        input: Table,
+        t_env: StreamTableEnvironment,
+        key_selector: Optional[KeySelector] = None,
+    ) -> AgentBuilder:
+        """Set input for agents. Used for remote execution.
+
+        Parameters
+        ----------
+        input : Table
+            Receive a Table as input.
+        t_env: StreamTableEnvironment
+            table environment supports convert Table to/from DataStream.
+        key_selector : KeySelector
+            Extract key from each input record.
+
+        Returns:
+        -------
+        AgentBuilder
+            A new builder to build an agent for specific input.
+        """
+
+    @abstractmethod
+    def execute(self) -> None:
+        """Execute agent individually."""
diff --git a/python/flink_agents/api/tests/test_event.py 
b/python/flink_agents/api/tests/test_event.py
index ebc7821..eeed674 100644
--- a/python/flink_agents/api/tests/test_event.py
+++ b/python/flink_agents/api/tests/test_event.py
@@ -20,6 +20,7 @@ from typing import Type
 import pytest
 from pydantic import ValidationError
 from pydantic_core import PydanticSerializationError
+from pyflink.common import Row
 
 from flink_agents.api.event import Event, InputEvent, OutputEvent
 
@@ -40,3 +41,6 @@ def test_event_setattr_non_serializable() -> None: #noqa D103
     with pytest.raises(PydanticSerializationError):
         event.c = Type[InputEvent]
 
+def test_input_event_ignore_row_unserializable() -> None: #noqa D103
+    InputEvent(input=Row({"a": 1}))
+
diff --git a/python/flink_agents/examples/agent_example.py 
b/python/flink_agents/examples/agent_example.py
index 94f1319..ebc814c 100644
--- a/python/flink_agents/examples/agent_example.py
+++ b/python/flink_agents/examples/agent_example.py
@@ -17,36 +17,38 @@
 
#################################################################################
 from typing import Any
 
+from flink_agents.api.agent import Agent
 from flink_agents.api.decorators import action
 from flink_agents.api.event import Event, InputEvent, OutputEvent
-from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
 from flink_agents.api.runner_context import RunnerContext
-from flink_agents.api.agent import Agent
 
 
-class MyEvent(Event): #noqa D101
+class MyEvent(Event):  # noqa D101
     value: Any
 
-#TODO: Replace this agent with more practical example.
+
+# TODO: Replace this agent with more practical example.
 class MyAgent(Agent):
     """An example of agent to show the basic usage.
 
     Currently, this agent doesn't really make sense, and it's mainly for 
developing
     validation.
     """
+
     @action(InputEvent)
     @staticmethod
-    def first_action(event: Event, ctx: RunnerContext): #noqa D102
+    def first_action(event: Event, ctx: RunnerContext):  # noqa D102
         input = event.input
-        content = input + ' first_action'
+        content = input + " first_action"
         ctx.send_event(MyEvent(value=content))
         ctx.send_event(OutputEvent(output=content))
 
     @action(MyEvent)
     @staticmethod
-    def second_action(event: Event, ctx: RunnerContext): #noqa D102
+    def second_action(event: Event, ctx: RunnerContext):  # noqa D102
         input = event.value
-        content = input + ' second_action'
+        content = input + " second_action"
         ctx.send_event(OutputEvent(output=content))
 
 
@@ -58,10 +60,11 @@ if __name__ == "__main__":
 
     output_list = env.from_list(input_list).apply(agent).to_list()
 
-
-    input_list.append({'key': 'bob', 'value': 'The message from bob'})
-    input_list.append({'k': 'john', 'v': 'The message from john'})
-    input_list.append({'value': 'The message from unknown'}) # will 
automatically generate a new unique key
+    input_list.append({"key": "bob", "value": "The message from bob"})
+    input_list.append({"k": "john", "v": "The message from john"})
+    input_list.append(
+        {"value": "The message from unknown"}
+    )  # will automatically generate a new unique key
 
     env.execute()
 
diff --git 
a/python/flink_agents/examples/integrate_datastream_with_agent_example.py 
b/python/flink_agents/examples/integrate_datastream_with_agent_example.py
new file mode 100644
index 0000000..768e155
--- /dev/null
+++ b/python/flink_agents/examples/integrate_datastream_with_agent_example.py
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+
+from pyflink.common import Duration, WatermarkStrategy
+from pyflink.datastream import (
+    KeySelector,
+    RuntimeExecutionMode,
+    StreamExecutionEnvironment,
+)
+from pyflink.datastream.connectors.file_system import FileSource, StreamFormat
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.examples.my_agent import DataStreamAgent, ItemData
+
+
+class MyKeySelector(KeySelector):
+    """KeySelector for extracting key."""
+
+    def get_key(self, value: ItemData) -> int:
+        """Extract key from ItemData."""
+        return value.id
+
+
+current_dir = Path(__file__).parent
+
+# if this example raises exception "No module named 'flink_agents'", you could 
set
+# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/
+# site-packages) in this file.
+if __name__ == "__main__":
+    env = StreamExecutionEnvironment.get_execution_environment()
+
+    # should compile flink-agents jars before run this example.
+    env.add_jars(
+        
f"file:///{current_dir}/../../../runtime/target/flink-agents-runtime-0.1-SNAPSHOT.jar"
+    )
+    env.add_jars(
+        
f"file:///{current_dir}/../../../plan/target/flink-agents-plan-0.1-SNAPSHOT.jar"
+    )
+    env.add_jars(
+        
f"file:///{current_dir}/../../../api/target/flink-agents-api-0.1-SNAPSHOT.jar"
+    )
+
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+
+    # currently, bounded source is not supported due to runtime 
implementation, so
+    # we use continuous file source here.
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(), f"file:///{current_dir}/resources"
+        )
+        .monitor_continuously(Duration.of_minutes(1))
+        .build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="streaming_agent_example",
+    )
+
+    deserialize_datastream = input_datastream.map(
+        lambda x: ItemData.model_validate_json(x)
+    )
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    output_datastream = (
+        agents_env.from_datastream(
+            input=deserialize_datastream, key_selector=MyKeySelector()
+        )
+        .apply(DataStreamAgent())
+        .to_datastream()
+    )
+
+    output_datastream.print()
+
+    agents_env.execute()
diff --git a/python/flink_agents/examples/integrate_table_with_agent_example.py 
b/python/flink_agents/examples/integrate_table_with_agent_example.py
new file mode 100644
index 0000000..07b60c0
--- /dev/null
+++ b/python/flink_agents/examples/integrate_table_with_agent_example.py
@@ -0,0 +1,119 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from pathlib import Path
+
+from pyflink.common import Row
+from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo
+from pyflink.datastream import (
+    KeySelector,
+    RuntimeExecutionMode,
+    StreamExecutionEnvironment,
+)
+from pyflink.table import (
+    DataTypes,
+    Schema,
+    StreamTableEnvironment,
+    TableDescriptor,
+)
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.examples.my_agent import TableAgent
+
+current_dir = Path(__file__).parent
+
+
+class MyKeySelector(KeySelector):
+    """KeySelector for extracting key."""
+
+    def get_key(self, value: Row) -> int:
+        """Extract key from Row."""
+        return value[0]
+
+
+# if this example raises exception "No module named 'flink_agents'", you could 
set
+# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/
+# site-packages) in this file.
+if __name__ == "__main__":
+    env = StreamExecutionEnvironment.get_execution_environment()
+
+    # should compile flink-agents jars before run this example.
+    env.add_jars(
+        
f"file:///{current_dir}/../../../runtime/target/flink-agents-runtime-0.1-SNAPSHOT.jar"
+    )
+    env.add_jars(
+        
f"file:///{current_dir}/../../../plan/target/flink-agents-plan-0.1-SNAPSHOT.jar"
+    )
+    env.add_jars(
+        
f"file:///{current_dir}/../../../api/target/flink-agents-api-0.1-SNAPSHOT.jar"
+    )
+
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+    t_env.create_temporary_table(
+        "source",
+        TableDescriptor.for_connector("filesystem")
+        .schema(
+            Schema.new_builder()
+            .column("id", DataTypes.BIGINT())
+            .column("review", DataTypes.STRING())
+            .column("review_score", DataTypes.FLOAT())
+            .build()
+        )
+        .option("format", "json")
+        .option("path", f"file:///{current_dir}/resources")
+        .option("source.monitor-interval", "60s")
+        .build(),
+    )
+
+    table = t_env.from_path("source")
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+
+    output_type = ExternalTypeInfo(
+        RowTypeInfo(
+            [
+                BasicTypeInfo.LONG_TYPE_INFO(),
+                BasicTypeInfo.STRING_TYPE_INFO(),
+                BasicTypeInfo.FLOAT_TYPE_INFO(),
+            ],
+            ["id", "review", "review_score"],
+        )
+    )
+
+    schema = (
+        Schema.new_builder()
+        .column("id", DataTypes.BIGINT())
+        .column("review", DataTypes.STRING())
+        .column("review_score", DataTypes.FLOAT())
+    ).build()
+
+    output_table = (
+        agents_env.from_table(input=table, t_env=t_env, 
key_selector=MyKeySelector())
+        .apply(TableAgent())
+        .to_table(schema=schema, output_type=output_type)
+    )
+
+    t_env.create_temporary_table(
+        "sink",
+        TableDescriptor.for_connector("print").schema(schema).build(),
+    )
+
+    output_table.execute_insert("sink").wait()
diff --git a/python/flink_agents/examples/my_agent.py 
b/python/flink_agents/examples/my_agent.py
new file mode 100644
index 0000000..0c982fa
--- /dev/null
+++ b/python/flink_agents/examples/my_agent.py
@@ -0,0 +1,98 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import copy
+from typing import Any
+
+from pydantic import BaseModel
+
+from flink_agents.api.agent import Agent
+from flink_agents.api.decorators import action
+from flink_agents.api.event import Event, InputEvent, OutputEvent
+from flink_agents.api.runner_context import RunnerContext
+
+
+class ItemData(BaseModel):
+    """Data model for storing item information.
+
+    Attributes:
+    ----------
+    id : int
+        Unique identifier of the item
+    review : str
+        The user review of the item
+    review_score: float
+        The review_score of the item
+    """
+
+    id: int
+    review: str
+    review_score: float
+
+
+class MyEvent(Event):  # noqa D101
+    value: Any
+
+
+class DataStreamAgent(Agent):
+    """Agent used for explaining integrating agents with DataStream.
+
+    Because pemja will find action in this class when execute Agent, we can't
+    define this class directly in example.py for module name will be set
+    to __main__.
+    """
+
+    @action(InputEvent)
+    @staticmethod
+    def first_action(event: Event, ctx: RunnerContext):  # noqa D102
+        input = event.input
+        content = copy.deepcopy(input)
+        content.review += " first action"
+        ctx.send_event(MyEvent(value=content))
+
+    @action(MyEvent)
+    @staticmethod
+    def second_action(event: Event, ctx: RunnerContext):  # noqa D102
+        input = event.value
+        content = copy.deepcopy(input)
+        content.review += " second action"
+        ctx.send_event(OutputEvent(output=content))
+
+
+class TableAgent(Agent):
+    """Agent used for explaining integrating agents with Table.
+
+    Because pemja will find action in this class when execute Agent, we can't
+    define this class directly in example.py for module name will be set
+    to __main__.
+    """
+
+    @action(InputEvent)
+    @staticmethod
+    def first_action(event: Event, ctx: RunnerContext):  # noqa D102
+        input = event.input
+        content = input
+        content["review"] += " first action"
+        ctx.send_event(MyEvent(value=content))
+
+    @action(MyEvent)
+    @staticmethod
+    def second_action(event: Event, ctx: RunnerContext):  # noqa D102
+        input = event.value
+        content = input
+        content["review"] += " second action"
+        ctx.send_event(OutputEvent(output=content))
diff --git a/python/flink_agents/examples/resources/input_data.txt 
b/python/flink_agents/examples/resources/input_data.txt
new file mode 100644
index 0000000..13efe8e
--- /dev/null
+++ b/python/flink_agents/examples/resources/input_data.txt
@@ -0,0 +1,10 @@
+{"id":1,"review":"Great product! Works perfectly and lasts a long 
time.","review_score":3.0}
+{"id":2,"review":"The item arrived damaged, and the packaging was 
poor.","review_score":5.0}
+{"id":3,"review":"Highly satisfied with the performance and value for 
money.","review_score":7.0}
+{"id":3,"review":"Not as good as expected. It stopped working after a 
week.","review_score":8.0}
+{"id":1,"review":"Fast shipping and excellent customer service. Would buy 
again!","review_score":8.0}
+{"id":2,"review":"Too complicated to set up. Instructions were 
unclear.","review_score":8.0}
+{"id":2,"review":"Good quality, but overview_scored for what it 
does.","review_score":8.0}
+{"id":1,"review":"Exactly what I needed. Easy to use and very 
reliable.","review_score":8.0}
+{"id":2,"review":"Worst purchase ever. Waste of money and 
time.","review_score":8.0}
+{"id":2,"review":"Looks nice and functions well, but could be more 
durable.","review_score":8.0}
\ No newline at end of file
diff --git a/python/flink_agents/plan/agent_plan.py 
b/python/flink_agents/plan/agent_plan.py
index 5ea6197..e37d3a8 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -15,11 +15,13 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
+from typing import Dict, List
+
+from pydantic import BaseModel
+
 from flink_agents.api.agent import Agent
 from flink_agents.plan.action import Action
 from flink_agents.plan.function import PythonFunction
-from pydantic import BaseModel
-from typing import Dict, List
 
 
 class AgentPlan(BaseModel):
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py 
b/python/flink_agents/plan/tests/test_agent_plan.py
index a268770..6912044 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -20,12 +20,12 @@ from pathlib import Path
 
 import pytest
 
+from flink_agents.api.agent import Agent
 from flink_agents.api.decorators import action
 from flink_agents.api.event import Event, InputEvent, OutputEvent
 from flink_agents.api.runner_context import RunnerContext
-from flink_agents.api.agent import Agent
-from flink_agents.plan.function import PythonFunction
 from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.plan.function import PythonFunction
 
 
 class TestAgent(Agent): #noqa D101
diff --git a/python/flink_agents/runtime/local_execution_environment.py 
b/python/flink_agents/runtime/local_execution_environment.py
index 119354a..e2865f1 100644
--- a/python/flink_agents/runtime/local_execution_environment.py
+++ b/python/flink_agents/runtime/local_execution_environment.py
@@ -17,50 +17,97 @@
 
#################################################################################
 from typing import Any, Dict, List
 
-from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from pyflink.common import TypeInformation
+from pyflink.datastream import DataStream, KeySelector, 
StreamExecutionEnvironment
+from pyflink.table import Schema, StreamTableEnvironment, Table
+
 from flink_agents.api.agent import Agent
+from flink_agents.api.execution_environment import (
+    AgentBuilder,
+    AgentsExecutionEnvironment,
+)
 from flink_agents.runtime.local_runner import LocalRunner
 
 
-class LocalExecutionEnvironment(AgentsExecutionEnvironment):
-    """Implementation of AgentsExecutionEnvironment for local execution 
environment."""
+class LocalAgentBuilder(AgentBuilder):
+    """LocalAgentBuilder for building agent instance."""
 
+    __env: "LocalExecutionEnvironment"
     __input: List[Dict[str, Any]]
     __output: List[Any]
     __runner: LocalRunner = None
     __executed: bool = False
 
-    def __init__(self) -> None:
+    def __init__(
+        self, env: "LocalExecutionEnvironment", input: List[Dict[str, Any]]
+    ) -> None:
         """Init empty output list."""
-        self.__output = []
-
-    def from_list(self, input: list) -> 'AgentsExecutionEnvironment':
-        """Set input list of execution environment."""
+        self.__env = env
         self.__input = input
-        return self
+        self.__output = []
 
-    def apply(self, agent: Agent) -> 'AgentsExecutionEnvironment':
+    def apply(self, agent: Agent) -> AgentBuilder:
         """Create local runner to execute given agent.
 
-        Doesn't support apply multiple agents.
+        Doesn't support apply multiple Agents.
         """
         if self.__runner is not None:
-            err_msg = "LocalExecutionEnvironment doesn't support apply 
multiple agents."
+            err_msg = "LocalAgentBuilder doesn't support apply multiple 
agents."
             raise RuntimeError(err_msg)
         self.__runner = LocalRunner(agent)
+        self.__env.set_agent(self.__input, self.__output, self.__runner)
         return self
 
-    def to_list(self) -> list:
+    def to_list(self) -> List[Dict[str, Any]]:
         """Get output list of execution environment."""
         return self.__output
 
-    def execute(self) -> None:
-        """Execute agents.
+    def to_datastream(self) -> DataStream:
+        """Get output DataStream of agent execution.
 
-        Doesn't support execute multiple times.
+        This method is not supported for LocalAgentBuilder.
         """
+        msg = "LocalAgentBuilder does not support to_datastream."
+        raise NotImplementedError(msg)
+
+    def to_table(self, schema: Schema, output_type: TypeInformation) -> Table:
+        """Get output Table of agent execution.
+
+        This method is not supported for LocalAgentBuilder.
+        """
+        msg = "LocalAgentBuilder does not support to_table."
+        raise NotImplementedError(msg)
+
+
+class LocalExecutionEnvironment(AgentsExecutionEnvironment):
+    """Implementation of AgentsExecutionEnvironment for local execution 
environment."""
+
+    __input: List[Dict[str, Any]] = None
+    __output: List[Any] = None
+    __runner: LocalRunner = None
+    __executed: bool = False
+
+    def from_list(self, input: list) -> LocalAgentBuilder:
+        """Set input list of execution environment."""
+        if self.__input is not None:
+            err_msg = "LocalExecutionEnvironment doesn't support call 
from_list multiple times."
+            raise RuntimeError(err_msg)
+
+        self.__input = input
+        return LocalAgentBuilder(env=self, input=input)
+
+    def set_agent(self, input: list, output: list, runner: LocalRunner) -> 
None:
+        """Set agent input, output and runner."""
+        self.__input = input
+        self.__runner = runner
+        self.__output = output
+
+    def execute(self) -> None:
+        """Execute agent individually."""
         if self.__executed:
-            err_msg = "LocalExecutionEnvironment doesn't support execute 
multiple times."
+            err_msg = (
+                "LocalExecutionEnvironment doesn't support execute multiple 
times."
+            )
             raise RuntimeError(err_msg)
         self.__executed = True
         for input in self.__input:
@@ -69,12 +116,37 @@ class 
LocalExecutionEnvironment(AgentsExecutionEnvironment):
         for output in outputs:
             self.__output.append(output)
 
+    def from_datastream(
+        self, input: DataStream, key_selector: KeySelector = None
+    ) -> AgentBuilder:
+        """Set input DataStream of agent execution.
+
+        This method is not supported for local execution environments.
+        """
+        msg = "LocalExecutionEnvironment does not support from_datastream."
+        raise NotImplementedError(msg)
+
+    def from_table(
+        self,
+        input: Table,
+        t_env: StreamTableEnvironment,
+        key_selector: KeySelector = None,
+    ) -> AgentBuilder:
+        """Set input Table of agent execution.
+
+        This method is not supported for local execution environments.
+        """
+        msg = "LocalExecutionEnvironment does not support from_table."
+        raise NotImplementedError(msg)
+
 
-def get_execution_environment(**kwargs: Dict[str, Any]) -> 
AgentsExecutionEnvironment:
-    """Factory function to create a local agents execution environment.
+def create_instance(env: StreamExecutionEnvironment, **kwargs: Dict[str, Any]) 
-> AgentsExecutionEnvironment:
+    """Factory function to create a remote agents execution environment.
 
     Parameters
     ----------
+    env : StreamExecutionEnvironment
+        Flink job execution environment.
     **kwargs : Dict[str, Any]
         The dict of parameters to configure the execution environment.
 
diff --git a/python/flink_agents/runtime/local_runner.py 
b/python/flink_agents/runtime/local_runner.py
index 8149c2d..dfbb582 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -22,9 +22,9 @@ from typing import Any, Dict, List
 
 from typing_extensions import override
 
+from flink_agents.api.agent import Agent
 from flink_agents.api.event import Event, InputEvent, OutputEvent
 from flink_agents.api.runner_context import RunnerContext
-from flink_agents.api.agent import Agent
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.runtime.agent_runner import AgentRunner
 
diff --git a/python/flink_agents/runtime/python_java_utils.py 
b/python/flink_agents/runtime/python_java_utils.py
index 0abd160..a4bf1b8 100644
--- a/python/flink_agents/runtime/python_java_utils.py
+++ b/python/flink_agents/runtime/python_java_utils.py
@@ -1,22 +1,24 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
 #
-#   http://www.apache.org/licenses/LICENSE-2.0
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
 from typing import Any
 
 import cloudpickle
+
 from flink_agents.api.event import InputEvent
 
 
@@ -30,4 +32,4 @@ def wrap_to_input_event(bytesObject: bytes) -> bytes:
 
 def get_output_from_output_event(bytesObject: bytes) -> Any:
     """Get output data from OutputEvent and serialize."""
-    return cloudpickle.dumps(convert_to_python_object(bytesObject).output)
\ No newline at end of file
+    return cloudpickle.dumps(convert_to_python_object(bytesObject).output)
diff --git a/python/flink_agents/runtime/remote_execution_environment.py 
b/python/flink_agents/runtime/remote_execution_environment.py
new file mode 100644
index 0000000..3f6aae1
--- /dev/null
+++ b/python/flink_agents/runtime/remote_execution_environment.py
@@ -0,0 +1,223 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any, Dict, List, Optional
+
+import cloudpickle
+from pyflink.common import TypeInformation
+from pyflink.common.typeinfo import (
+    PickledBytesTypeInfo,
+)
+from pyflink.datastream import (
+    DataStream,
+    KeyedStream,
+    KeySelector,
+    StreamExecutionEnvironment,
+)
+from pyflink.table import Schema, StreamTableEnvironment, Table
+from pyflink.util.java_utils import invoke_method
+
+from flink_agents.api.agent import Agent
+from flink_agents.api.execution_environment import (
+    AgentBuilder,
+    AgentsExecutionEnvironment,
+)
+from flink_agents.plan.agent_plan import AgentPlan
+
+
+class RemoteAgentBuilder(AgentBuilder):
+    """RemoteAgentBuilder for integrating datastream/table and agent."""
+
+    __input: DataStream
+    __agent_plan: AgentPlan = None
+    __output: DataStream = None
+    __t_env: StreamTableEnvironment
+
+    def __init__(
+        self, input: DataStream, t_env: Optional[StreamTableEnvironment] = None
+    ) -> None:
+        """Init method of RemoteAgentBuilder."""
+        self.__input = input
+        self.__t_env = t_env
+
+    def apply(self, agent: Agent) -> "AgentBuilder":
+        """Set agent of execution environment.
+
+        Parameters
+        ----------
+        agent : Agent
+            The agent user defined to run in execution environment.
+        """
+        if self.__agent_plan is not None:
+            err_msg = "RemoteAgentBuilder doesn't support apply multiple 
agents yet."
+            raise RuntimeError(err_msg)
+        self.__agent_plan = AgentPlan.from_agent(agent)
+        return self
+
+    def to_datastream(
+        self, output_type: Optional[TypeInformation] = None
+    ) -> DataStream:
+        """Get output datastream of agent execution.
+
+        Returns:
+        -------
+        DataStream
+            Output datastream of agent execution.
+        """
+        if self.__agent_plan is None:
+            err_msg = "Must apply agent before call to_datastream/to_table."
+            raise RuntimeError(err_msg)
+
+        # return the same output datastream when call to_datastream multiple.
+        if self.__output is None:
+            j_data_stream_output = invoke_method(
+                None,
+                "org.apache.flink.agents.runtime.CompileUtils",
+                "connectToAgent",
+                [
+                    self.__input._j_data_stream,
+                    self.__agent_plan.model_dump_json(serialize_as_any=True),
+                ],
+                [
+                    "org.apache.flink.streaming.api.datastream.KeyedStream",
+                    "java.lang.String",
+                ],
+            )
+            output_stream = DataStream(j_data_stream_output)
+            self.__output = output_stream.map(
+                lambda x: cloudpickle.loads(x), output_type=output_type
+            )
+        return self.__output
+
+    def to_table(self, schema: Schema, output_type: TypeInformation) -> Table:
+        """Get output Table of agent execution.
+
+        Parameters
+        ----------
+        schema : Schema
+            Indicate schema of the output table.
+        output_type : TypeInformation
+            Indicate schema corresponding type information.
+
+        Returns:
+        -------
+        Table
+            Output Table of agent execution.
+        """
+        return self.__t_env.from_data_stream(self.to_datastream(output_type), 
schema)
+
+    def to_list(self) -> List[Dict[str, Any]]:
+        """Get output list of agent execution.
+
+        This method is not supported for remote execution environments.
+        """
+        msg = "RemoteAgentBuilder does not support to_list."
+        raise NotImplementedError(msg)
+
+
+class RemoteExecutionEnvironment(AgentsExecutionEnvironment):
+    """Implementation of AgentsExecutionEnvironment for execution with 
DataStream."""
+
+    __env: StreamExecutionEnvironment
+
+    def __init__(self, env: StreamExecutionEnvironment) -> None:
+        """Init method of RemoteExecutionEnvironment."""
+        self.__env = env
+
+    @staticmethod
+    def __process_input_datastream(
+        input: DataStream, key_selector: Optional[KeySelector] = None
+    ) -> KeyedStream:
+        if isinstance(input, KeyedStream):
+            return input
+        else:
+            if key_selector is None:
+                msg = "KeySelector must be provided."
+                raise RuntimeError(msg)
+            input = input.key_by(key_selector)
+            return input
+
+    def from_datastream(
+        self, input: DataStream, key_selector: KeySelector = None
+    ) -> RemoteAgentBuilder:
+        """Set input datastream of agent.
+
+        Parameters
+        ----------
+        input : DataStream
+            Receive a DataStream as input.
+        key_selector : KeySelector
+            Extract key from each input record, must not be None when input is
+            not KeyedStream.
+        """
+        input = self.__process_input_datastream(input, key_selector)
+
+        return RemoteAgentBuilder(input=input)
+
+    def from_table(
+        self,
+        input: Table,
+        t_env: StreamTableEnvironment,
+        key_selector: Optional[KeySelector] = None,
+    ) -> AgentBuilder:
+        """Set input Table of agent.
+
+        Parameters
+        ----------
+        input : Table
+            Receive a Table as input.
+        t_env: StreamTableEnvironment
+            table environment supports convert table to/from datastream.
+        key_selector : KeySelector
+            Extract key from each input record.
+        """
+        input = t_env.to_data_stream(table=input)
+
+        input = input.map(lambda x: x, output_type=PickledBytesTypeInfo())
+
+        input = self.__process_input_datastream(input, key_selector)
+        return RemoteAgentBuilder(input=input, t_env=t_env)
+
+    def from_list(self, input: List[Dict[str, Any]]) -> 
"AgentsExecutionEnvironment":
+        """Set input list of agent execution.
+
+        This method is not supported for remote execution environments.
+        """
+        msg = "RemoteExecutionEnvironment does not support from_list."
+        raise NotImplementedError(msg)
+
+    def execute(self) -> None:
+        """Execute agent."""
+        self.__env.execute()
+
+
+def create_instance(env: StreamExecutionEnvironment, **kwargs: Dict[str, Any]) 
-> AgentsExecutionEnvironment:
+    """Factory function to create a remote agents execution environment.
+
+    Parameters
+    ----------
+    env : StreamExecutionEnvironment
+        Flink job execution environment.
+    **kwargs : Dict[str, Any]
+        The dict of parameters to configure the execution environment.
+
+    Returns:
+    -------
+    AgentsExecutionEnvironment
+        A configured agents execution environment instance.
+    """
+    return RemoteExecutionEnvironment(env=env)
diff --git 
a/python/flink_agents/runtime/tests/test_local_execution_environment.py 
b/python/flink_agents/runtime/tests/test_local_execution_environment.py
index 6857f3e..4aaf521 100644
--- a/python/flink_agents/runtime/tests/test_local_execution_environment.py
+++ b/python/flink_agents/runtime/tests/test_local_execution_environment.py
@@ -17,11 +17,11 @@
 
#################################################################################
 import pytest
 
+from flink_agents.api.agent import Agent
 from flink_agents.api.decorators import action
 from flink_agents.api.event import Event, InputEvent, OutputEvent
-from flink_agents.api.execution_enviroment import AgentsExecutionEnvironment
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
 from flink_agents.api.runner_context import RunnerContext
-from flink_agents.api.agent import Agent
 
 
 class TestAgent1(Agent): # noqa: D101
@@ -80,3 +80,12 @@ def test_local_execution_environment_execute_multi_times() 
-> None: # noqa: D103
     env.execute()
     with pytest.raises(RuntimeError):
         env.execute()
+
+def test_local_execution_environment_call_from_list_twice() -> None:  # noqa: 
D103
+    env = AgentsExecutionEnvironment.get_execution_environment()
+
+    input_list = []
+
+    env.from_list(input_list)
+    with pytest.raises(RuntimeError):
+        env.from_list(input_list)
diff --git a/python/requirements/test_requirements.txt 
b/python/requirements/test_requirements.txt
index 11ddd75..52a14f7 100644
--- a/python/requirements/test_requirements.txt
+++ b/python/requirements/test_requirements.txt
@@ -16,3 +16,4 @@ wheel
 setuptools>=75.3
 pytest==8.4.0
 pydantic==2.11.4
+apache-flink==1.20.1

Reply via email to