This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new ac5769b [docs] Update integrate with flink doc. (#241)
ac5769b is described below
commit ac5769bd0627e0f9645c88c8415bc60e952992b4
Author: Wenjin Xie <[email protected]>
AuthorDate: Tue Sep 30 18:49:19 2025 +0800
[docs] Update integrate with flink doc. (#241)
---
.../docs/development/integrate_with_flink.md | 64 ++++++++++++++++++++--
1 file changed, 59 insertions(+), 5 deletions(-)
diff --git a/docs/content/docs/development/integrate_with_flink.md
b/docs/content/docs/development/integrate_with_flink.md
index 6b1b65f..676eb9d 100644
--- a/docs/content/docs/development/integrate_with_flink.md
+++ b/docs/content/docs/development/integrate_with_flink.md
@@ -21,15 +21,69 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
-
+## Overview
+Flink Agents is an Agentic AI framework based on Apache Flink. By integrate
agent with flink DataStream/Table, Flink Agents can leverage the powerful data
processing ability of Flink.
## From/To Flink DataStream API
-{{< hint warning >}}
-**TODO**: How to integrate with Flink DataStream API.
-{{< /hint >}}
+First of all, get the flink `StreamExecutionEnvironment` and flink-agents
`AgentsExecutionEnvironment`.
+
+```python
+# Set up the Flink streaming environment and the Agents execution environment.
+env = StreamExecutionEnvironment.get_execution_environment()
+agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
+```
+
+Integrate the agent with input `DataStream`, and return the output
`DataStream` can be consumed by downstream.
+
+```python
+# create input datastream
+input_stream = env.from_source(...)
+
+# integrate agent with input datastream, and return output datastream
+output_stream = (
+ agents_env.from_datastream(
+ input=input_stream, key_selector=lambda x: x.id
+ )
+ .apply(your_agent)
+ .to_datastream()
+)
+
+# comsume agent output datastream
+output_stream.print()
+```
+
+The input `DataStream` must be `KeyedStream`, or user should provide
`KeySelector` to tell how to convert the input `DataStream` to `KeyedStream`.
## From/To Flink Table API
+First of all, get the flink `StreamExecutionEnvironment`,
`StreamTableEnvironment`, and flink-agents `AgentsExecutionEnvironment`.
+```python
+# Set up the Flink streaming environment and table environment
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+# Setup flink agnets execution environment
+agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env,
t_env=t_env)
+```
+
+Integrate the agent with input `Table`, and return the output `Table` can be
consumed by downstream.
+
+```python
+output_type = ExternalTypeInfo(RowTypeInfo(
+ [BasicTypeInfo.INT_TYPE_INFO()],
+ ["result"],
+))
+
+schema = (Schema.new_builder().column("result", DataTypes.INT())).build()
+
+output_table = (
+ agents_env.from_table(input=input_table, key_selector=MyKeySelector())
+ .apply(agent)
+ .to_table(schema=schema, output_type=output_type)
+)
+```
+
+User should provide `KeySelector` in `from_table()` to tell how to convert the
input `Table` to `KeyedStream` internally. And provide `Schema` and
`TypeInfomation` in `to_table()` to tell the output `Table` schema.
{{< hint warning >}}
-**TODO**: How to integrate with Flink Table API.
+__Note:__ Currently, user should provide both `Schema` and `TypeInformation`
when call `to_table()`, we will support only provide one of them in the future.
{{< /hint >}}
\ No newline at end of file