This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 26eaffce [docs] Polish integrate_with_flink doc: self-contained
snippets and language-scoped notes (#800)
26eaffce is described below
commit 26eaffceff870fa02b11ec1108f381e5c9e155af
Author: bosiew.tian <[email protected]>
AuthorDate: Mon Jun 8 17:54:28 2026 +0800
[docs] Polish integrate_with_flink doc: self-contained snippets and
language-scoped notes (#800)
- Fix Overview grammar: "By integrating agents with Flink DataStream/Table".
- Make the Python/Java DataStream and Table snippets self-contained: define
YourPojo / MyKeySelector / RowKeySelector, replace bare "..." sources with
concrete calls, add the real imports, and link to runnable examples.
- Correct the misleading Java "always a nested row" comment: the agent
output
is exposed as a single anonymous column f0; nest as ROW only for composite
output (matches FlinkIntegrationTest.testFromTableToTable's flat f0
STRING).
- Document the Python/Java output-column naming difference (named
RowTypeInfo
fields vs the Java f0 default).
Closes #777
Co-authored-by: bosiew.tian <[email protected]>
---
.../docs/development/integrate_with_flink.md | 113 +++++++++++++++++----
1 file changed, 95 insertions(+), 18 deletions(-)
diff --git a/docs/content/docs/development/integrate_with_flink.md
b/docs/content/docs/development/integrate_with_flink.md
index b3bc193f..8a115e1c 100644
--- a/docs/content/docs/development/integrate_with_flink.md
+++ b/docs/content/docs/development/integrate_with_flink.md
@@ -22,7 +22,7 @@ specific language governing permissions and limitations
under the License.
-->
## Overview
-Flink Agents is an Agentic AI framework based on Apache Flink. By integrate
agent with flink DataStream/Table, Flink Agents can leverage the powerful data
processing ability of Flink.
+Flink Agents is an Agentic AI framework based on Apache Flink. By integrating
agents with Flink DataStream/Table, Flink Agents can leverage the powerful data
processing ability of Flink.
## From/To Flink DataStream API
First of all, get the flink `StreamExecutionEnvironment` and flink-agents
`AgentsExecutionEnvironment`.
@@ -55,8 +55,14 @@ Integrate the agent with input `DataStream`, and return the
output `DataStream`
{{< tab "Python" >}}
```python
+from pyflink.common import WatermarkStrategy
+
# create input datastream
-input_stream = env.from_source(...)
+input_stream = env.from_source(
+ source=your_source,
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="your_source_name",
+)
# integrate agent with input datastream, and return output datastream
output_stream = (
@@ -73,9 +79,31 @@ output_stream.print()
{{< /tab >}}
{{< tab "Java" >}}
+```java
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+// A minimal Flink POJO used as the input element type. A Flink POJO must
+// have a public no-arg constructor and public (or getter/setter-accessible)
fields.
+public static class YourPojo {
+ public String id;
+
+ public YourPojo() {}
+
+ public YourPojo(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+}
+```
+
```java
// create input datastream
-DataStream<YourPojo> inputStream = env.fromSource(...);
+DataStream<YourPojo> inputStream =
+ env.fromElements(new YourPojo("item1"), new YourPojo("item2"));
// integrate agent with input datastream, and return output datastream
DataStream<Object> outputStream =
@@ -93,6 +121,8 @@ outputStream.print();
The input `DataStream` must be `KeyedStream`, or user should provide
`KeySelector` to tell how to convert the input `DataStream` to `KeyedStream`.
+For complete, runnable examples, see
[`WorkflowSingleAgentExample.java`](https://github.com/apache/flink-agents/blob/main/examples/src/main/java/org/apache/flink/agents/examples/WorkflowSingleAgentExample.java)
(Java) and
[`workflow_single_agent_example.py`](https://github.com/apache/flink-agents/blob/main/python/flink_agents/examples/quickstart/workflow_single_agent_example.py)
(Python).
+
## From/To Flink Table API
First of all, get the flink `StreamExecutionEnvironment`,
`StreamTableEnvironment`, and flink-agents `AgentsExecutionEnvironment`.
@@ -131,8 +161,26 @@ Integrate the agent with input `Table`, and return the
output `Table` can be con
{{< tab "Python" >}}
```python
-input_table = t_env.from_elements(...)
-
+from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo,
RowTypeInfo
+from pyflink.datastream import KeySelector
+from pyflink.table import DataTypes, Schema
+
+
+# Tell from_table how to derive the key used to convert the input Table to a
+# KeyedStream internally.
+class MyKeySelector(KeySelector):
+ def get_key(self, value):
+ return value.id
+
+
+# create input table (here a small in-memory table; replace with your own
source)
+input_table = t_env.from_elements(
+ [(1, "hello"), (2, "world")],
+ ["id", "input"],
+)
+
+# The output TypeInformation and Schema must be mutually consistent: both
+# describe a single "result" INT column here.
output_type = ExternalTypeInfo(RowTypeInfo(
[BasicTypeInfo.INT_TYPE_INFO()],
["result"],
@@ -142,7 +190,7 @@ schema = (Schema.new_builder().column("result",
DataTypes.INT())).build()
output_table = (
agents_env.from_table(input=input_table, key_selector=MyKeySelector())
- .apply(agent)
+ .apply(your_agent)
.to_table(schema=schema, output_type=output_type)
)
```
@@ -150,21 +198,43 @@ output_table = (
{{< tab "Java" >}}
```java
-Table inputTable = tableEnv.fromValues(...);
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+
+// Key selector that extracts the key from each input Row (here, field 0 / the
"id" column).
+public static class RowKeySelector implements KeySelector<Object, Integer> {
+ @Override
+ public Integer getKey(Object value) {
+ Row row = (Row) value;
+ return (Integer) row.getField(0);
+ }
+}
+```
-// Here the output schema should always be a nested row, of which
-// the f0 column is the expected row.
-Schema outputSchema =
- Schema.newBuilder()
- .column("f0", DataTypes.ROW(DataTypes.FIELD("result",
DataTypes.DOUBLE())))
- .build();
+```java
+Table inputTable =
+ tableEnv.fromValues(
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.INT()),
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("score", DataTypes.DOUBLE())),
+ Row.of(1, "Alice", 85.5),
+ Row.of(2, "Bob", 92.0),
+ Row.of(3, "Charlie", 78.3));
+
+// The agent output is exposed as a single anonymous column named "f0".
+// Declare "f0" with the agent's OUTPUT type: a scalar type (e.g.
DataTypes.STRING())
+// when the agent emits a scalar value, or a nested DataTypes.ROW(...) only
when the
+// agent emits a composite row.
+Schema outputSchema = Schema.newBuilder().column("f0",
DataTypes.STRING()).build();
Table outputTable =
agentsEnv
- .fromTable(
- inputTable,
- myKeySelector)
- .apply(agent)
+ .fromTable(inputTable, new RowKeySelector())
+ .apply(yourAgent)
.toTable(outputSchema);
```
{{< /tab >}}
@@ -179,6 +249,13 @@ The arguments required by `to_table()` differ by language:
- **Python**: provide both `Schema` and `TypeInformation` to define the output
`Table` schema.
- **Java**: provide only `Schema` (`toTable(Schema)`); `TypeInformation` is
not required.
+The two languages also name the output columns differently:
+
+- **Python**: the `TypeInformation` passed to `to_table()` is a `RowTypeInfo`
whose field names become the output columns, so you name them directly (the
`"result"` column above matches `RowTypeInfo([...], ["result"])`).
+- **Java**: `toTable(Schema)` exposes the agent output as a single anonymous
column named `f0` (internally it calls
`StreamTableEnvironment.fromDataStream(DataStream<Object>, schema)`), so the
`Schema` must reference `f0` — wrap it in a `ROW(...)` only when the agent
emits a composite row.
+
{{< hint info >}}
In Python, `to_table()` currently requires both `Schema` and
`TypeInformation`; we plan to support providing only one of them in the future.
-{{< /hint >}}
\ No newline at end of file
+{{< /hint >}}
+
+For complete, runnable examples, see
[`WorkflowMultipleAgentExample.java`](https://github.com/apache/flink-agents/blob/main/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java)
(Java) and
[`workflow_multiple_agent_example.py`](https://github.com/apache/flink-agents/blob/main/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py)
(Python).