This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new ac5769b  [docs] Update integrate with flink doc. (#241)
ac5769b is described below

commit ac5769bd0627e0f9645c88c8415bc60e952992b4
Author: Wenjin Xie <[email protected]>
AuthorDate: Tue Sep 30 18:49:19 2025 +0800

    [docs] Update integrate with flink doc. (#241)
---
 .../docs/development/integrate_with_flink.md       | 64 ++++++++++++++++++++--
 1 file changed, 59 insertions(+), 5 deletions(-)

diff --git a/docs/content/docs/development/integrate_with_flink.md 
b/docs/content/docs/development/integrate_with_flink.md
index 6b1b65f..676eb9d 100644
--- a/docs/content/docs/development/integrate_with_flink.md
+++ b/docs/content/docs/development/integrate_with_flink.md
@@ -21,15 +21,69 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-
+## Overview
+Flink Agents is an Agentic AI framework based on Apache Flink. By integrate 
agent with flink DataStream/Table, Flink Agents can leverage the powerful data 
processing ability of Flink. 
 ## From/To Flink DataStream API
 
-{{< hint warning >}}
-**TODO**: How to integrate with Flink DataStream API.
-{{< /hint >}}
+First of all, get the flink `StreamExecutionEnvironment` and flink-agents 
`AgentsExecutionEnvironment`.
+
+```python
+# Set up the Flink streaming environment and the Agents execution environment.
+env = StreamExecutionEnvironment.get_execution_environment()
+agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
+```
+
+Integrate the agent with input `DataStream`, and return the output 
`DataStream` can be consumed by downstream.
+
+```python
+# create input datastream
+input_stream = env.from_source(...)
+
+# integrate agent with input datastream, and return output datastream
+output_stream = (
+    agents_env.from_datastream(
+        input=input_stream, key_selector=lambda x: x.id
+    )
+    .apply(your_agent)
+    .to_datastream()
+)
+
+# comsume agent output datastream
+output_stream.print()
+```
+
+The input `DataStream` must be `KeyedStream`, or user should provide 
`KeySelector` to tell how to convert the input `DataStream` to `KeyedStream`.
 
 ## From/To Flink Table API
 
+First of all, get the flink `StreamExecutionEnvironment`, 
`StreamTableEnvironment`, and flink-agents `AgentsExecutionEnvironment`.
+```python
+# Set up the Flink streaming environment and table environment
+env = StreamExecutionEnvironment.get_execution_environment()
+t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+# Setup flink agnets execution environment
+agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env, 
t_env=t_env)
+```
+
+Integrate the agent with input `Table`, and return the output `Table` can be 
consumed by downstream.
+
+```python
+output_type = ExternalTypeInfo(RowTypeInfo(
+    [BasicTypeInfo.INT_TYPE_INFO()],
+    ["result"],
+))
+
+schema = (Schema.new_builder().column("result", DataTypes.INT())).build()
+
+output_table = (
+    agents_env.from_table(input=input_table, key_selector=MyKeySelector())
+    .apply(agent)
+    .to_table(schema=schema, output_type=output_type)
+)
+```
+
+User should provide `KeySelector` in `from_table()` to tell how to convert the 
input `Table` to `KeyedStream` internally. And provide `Schema` and 
`TypeInfomation` in `to_table()` to tell the output `Table` schema.
 {{< hint warning >}}
-**TODO**: How to integrate with Flink Table API.
+__Note:__ Currently, user should provide both `Schema` and `TypeInformation` 
when call `to_table()`, we will support only provide one of them in the future.
 {{< /hint >}}
\ No newline at end of file

Reply via email to