This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 26eaffce [docs] Polish integrate_with_flink doc: self-contained 
snippets and language-scoped notes (#800)
26eaffce is described below

commit 26eaffceff870fa02b11ec1108f381e5c9e155af
Author: bosiew.tian <[email protected]>
AuthorDate: Mon Jun 8 17:54:28 2026 +0800

    [docs] Polish integrate_with_flink doc: self-contained snippets and 
language-scoped notes (#800)
    
    - Fix Overview grammar: "By integrating agents with Flink DataStream/Table".
    - Make the Python/Java DataStream and Table snippets self-contained: define
      YourPojo / MyKeySelector / RowKeySelector, replace bare "..." sources with
      concrete calls, add the real imports, and link to runnable examples.
    - Correct the misleading Java "always a nested row" comment: the agent 
output
      is exposed as a single anonymous column f0; nest as ROW only for composite
      output (matches FlinkIntegrationTest.testFromTableToTable's flat f0 
STRING).
    - Document the Python/Java output-column naming difference (named 
RowTypeInfo
      fields vs the Java f0 default).
    
    Closes #777
    
    Co-authored-by: bosiew.tian <[email protected]>
---
 .../docs/development/integrate_with_flink.md       | 113 +++++++++++++++++----
 1 file changed, 95 insertions(+), 18 deletions(-)

diff --git a/docs/content/docs/development/integrate_with_flink.md 
b/docs/content/docs/development/integrate_with_flink.md
index b3bc193f..8a115e1c 100644
--- a/docs/content/docs/development/integrate_with_flink.md
+++ b/docs/content/docs/development/integrate_with_flink.md
@@ -22,7 +22,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 ## Overview
-Flink Agents is an Agentic AI framework based on Apache Flink. By integrate 
agent with flink DataStream/Table, Flink Agents can leverage the powerful data 
processing ability of Flink. 
+Flink Agents is an Agentic AI framework based on Apache Flink. By integrating 
agents with Flink DataStream/Table, Flink Agents can leverage the powerful data 
processing ability of Flink.
 ## From/To Flink DataStream API
 
 First of all, get the flink `StreamExecutionEnvironment` and flink-agents 
`AgentsExecutionEnvironment`.
@@ -55,8 +55,14 @@ Integrate the agent with input `DataStream`, and return the 
output `DataStream`
 
 {{< tab "Python" >}}
 ```python
+from pyflink.common import WatermarkStrategy
+
 # create input datastream
-input_stream = env.from_source(...)
+input_stream = env.from_source(
+    source=your_source,
+    watermark_strategy=WatermarkStrategy.no_watermarks(),
+    source_name="your_source_name",
+)
 
 # integrate agent with input datastream, and return output datastream
 output_stream = (
@@ -73,9 +79,31 @@ output_stream.print()
 {{< /tab >}}
 
 {{< tab "Java" >}}
+```java
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+// A minimal Flink POJO used as the input element type. A Flink POJO must
+// have a public no-arg constructor and public (or getter/setter-accessible) 
fields.
+public static class YourPojo {
+    public String id;
+
+    public YourPojo() {}
+
+    public YourPojo(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return id;
+    }
+}
+```
+
 ```java
 // create input datastream
-DataStream<YourPojo> inputStream = env.fromSource(...);
+DataStream<YourPojo> inputStream =
+        env.fromElements(new YourPojo("item1"), new YourPojo("item2"));
 
 // integrate agent with input datastream, and return output datastream
 DataStream<Object> outputStream =
@@ -93,6 +121,8 @@ outputStream.print();
 
 The input `DataStream` must be `KeyedStream`, or user should provide 
`KeySelector` to tell how to convert the input `DataStream` to `KeyedStream`.
 
+For complete, runnable examples, see 
[`WorkflowSingleAgentExample.java`](https://github.com/apache/flink-agents/blob/main/examples/src/main/java/org/apache/flink/agents/examples/WorkflowSingleAgentExample.java)
 (Java) and 
[`workflow_single_agent_example.py`](https://github.com/apache/flink-agents/blob/main/python/flink_agents/examples/quickstart/workflow_single_agent_example.py)
 (Python).
+
 ## From/To Flink Table API
 
 First of all, get the flink `StreamExecutionEnvironment`, 
`StreamTableEnvironment`, and flink-agents `AgentsExecutionEnvironment`.
@@ -131,8 +161,26 @@ Integrate the agent with input `Table`, and return the 
output `Table` can be con
 
 {{< tab "Python" >}}
 ```python
-input_table = t_env.from_elements(...)
-    
+from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, 
RowTypeInfo
+from pyflink.datastream import KeySelector
+from pyflink.table import DataTypes, Schema
+
+
+# Tell from_table how to derive the key used to convert the input Table to a
+# KeyedStream internally.
+class MyKeySelector(KeySelector):
+    def get_key(self, value):
+        return value.id
+
+
+# create input table (here a small in-memory table; replace with your own 
source)
+input_table = t_env.from_elements(
+    [(1, "hello"), (2, "world")],
+    ["id", "input"],
+)
+
+# The output TypeInformation and Schema must be mutually consistent: both
+# describe a single "result" INT column here.
 output_type = ExternalTypeInfo(RowTypeInfo(
     [BasicTypeInfo.INT_TYPE_INFO()],
     ["result"],
@@ -142,7 +190,7 @@ schema = (Schema.new_builder().column("result", 
DataTypes.INT())).build()
 
 output_table = (
     agents_env.from_table(input=input_table, key_selector=MyKeySelector())
-    .apply(agent)
+    .apply(your_agent)
     .to_table(schema=schema, output_type=output_type)
 )
 ```
@@ -150,21 +198,43 @@ output_table = (
 
 {{< tab "Java" >}}
 ```java
-Table inputTable = tableEnv.fromValues(...);
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+
+// Key selector that extracts the key from each input Row (here, field 0 / the 
"id" column).
+public static class RowKeySelector implements KeySelector<Object, Integer> {
+    @Override
+    public Integer getKey(Object value) {
+        Row row = (Row) value;
+        return (Integer) row.getField(0);
+    }
+}
+```
 
-// Here the output schema should always be a nested row, of which
-// the f0 column is the expected row.
-Schema outputSchema =
-        Schema.newBuilder()
-                .column("f0", DataTypes.ROW(DataTypes.FIELD("result", 
DataTypes.DOUBLE())))
-                .build();
+```java
+Table inputTable =
+        tableEnv.fromValues(
+                DataTypes.ROW(
+                        DataTypes.FIELD("id", DataTypes.INT()),
+                        DataTypes.FIELD("name", DataTypes.STRING()),
+                        DataTypes.FIELD("score", DataTypes.DOUBLE())),
+                Row.of(1, "Alice", 85.5),
+                Row.of(2, "Bob", 92.0),
+                Row.of(3, "Charlie", 78.3));
+
+// The agent output is exposed as a single anonymous column named "f0".
+// Declare "f0" with the agent's OUTPUT type: a scalar type (e.g. 
DataTypes.STRING())
+// when the agent emits a scalar value, or a nested DataTypes.ROW(...) only 
when the
+// agent emits a composite row.
+Schema outputSchema = Schema.newBuilder().column("f0", 
DataTypes.STRING()).build();
 
 Table outputTable =
         agentsEnv
-                .fromTable(
-                        inputTable,
-                        myKeySelector)
-                .apply(agent)
+                .fromTable(inputTable, new RowKeySelector())
+                .apply(yourAgent)
                 .toTable(outputSchema);
 ```
 {{< /tab >}}
@@ -179,6 +249,13 @@ The arguments required by `to_table()` differ by language:
 - **Python**: provide both `Schema` and `TypeInformation` to define the output 
`Table` schema.
 - **Java**: provide only `Schema` (`toTable(Schema)`); `TypeInformation` is 
not required.
 
+The two languages also name the output columns differently:
+
+- **Python**: the `TypeInformation` passed to `to_table()` is a `RowTypeInfo` 
whose field names become the output columns, so you name them directly (the 
`"result"` column above matches `RowTypeInfo([...], ["result"])`).
+- **Java**: `toTable(Schema)` exposes the agent output as a single anonymous 
column named `f0` (internally it calls 
`StreamTableEnvironment.fromDataStream(DataStream<Object>, schema)`), so the 
`Schema` must reference `f0` — wrap it in a `ROW(...)` only when the agent 
emits a composite row.
+
 {{< hint info >}}
 In Python, `to_table()` currently requires both `Schema` and 
`TypeInformation`; we plan to support providing only one of them in the future.
-{{< /hint >}}
\ No newline at end of file
+{{< /hint >}}
+
+For complete, runnable examples, see 
[`WorkflowMultipleAgentExample.java`](https://github.com/apache/flink-agents/blob/main/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java)
 (Java) and 
[`workflow_multiple_agent_example.py`](https://github.com/apache/flink-agents/blob/main/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py)
 (Python).

Reply via email to