This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.1 by this push:
new 836c45d [doc] Add instructions for java in integration and operations
docs. (#265)
836c45d is described below
commit 836c45daa16df11a8e69adaed9e51e205d0b6eab
Author: Wenjin Xie <[email protected]>
AuthorDate: Thu Oct 9 20:03:34 2025 +0800
[doc] Add instructions for java in integration and operations docs. (#265)
---
.../docs/development/integrate_with_flink.md | 97 +++++++++++++++++++++-
docs/content/docs/operations/configuration.md | 30 +++++++
docs/content/docs/operations/deployment.md | 21 +++++
docs/content/docs/operations/monitoring.md | 34 +++++++-
4 files changed, 177 insertions(+), 5 deletions(-)
diff --git a/docs/content/docs/development/integrate_with_flink.md
b/docs/content/docs/development/integrate_with_flink.md
index bad997e..b1e7d29 100644
--- a/docs/content/docs/development/integrate_with_flink.md
+++ b/docs/content/docs/development/integrate_with_flink.md
@@ -27,14 +27,33 @@ Flink Agents is an Agentic AI framework based on Apache
Flink. By integrate agen
First of all, get the flink `StreamExecutionEnvironment` and flink-agents
`AgentsExecutionEnvironment`.
+{{< tabs "Prepare Agents Execution Environment for DataStream" >}}
+
+{{< tab "Python" >}}
```python
# Set up the Flink streaming environment and the Agents execution environment.
env = StreamExecutionEnvironment.get_execution_environment()
agents_env = AgentsExecutionEnvironment.get_execution_environment(env)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+// Set up the Flink streaming environment and the Agents execution environment.
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
Integrate the agent with input `DataStream`, and return the output
`DataStream` can be consumed by downstream.
+{{< tabs "From/To DataStream" >}}
+
+{{< tab "Python" >}}
```python
# create input datastream
input_stream = env.from_source(...)
@@ -48,27 +67,72 @@ output_stream = (
.to_datastream()
)
-# comsume agent output datastream
+# consume agent output datastream
output_stream.print()
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+// create input datastream
+DataStream<String> inputStream = env.fromSource(...);
+
+// integrate agent with input datastream, and return output datastream
+DataStream<Object> outputStream =
+ agentsEnv
+ .fromDataStream(inputStream, (KeySelector<YourPojo, String>)
x::getId)
+ .apply(yourAgent)
+ .toDataStream();
+
+// consume agent output datastream
+outputStream.print();
+```
+{{< /tab >}}
+
+{{< /tabs >}}
The input `DataStream` must be `KeyedStream`, or user should provide
`KeySelector` to tell how to convert the input `DataStream` to `KeyedStream`.
## From/To Flink Table API
First of all, get the flink `StreamExecutionEnvironment`,
`StreamTableEnvironment`, and flink-agents `AgentsExecutionEnvironment`.
+
+{{< tabs "Prepare Agents Execution Environment for Table" >}}
+
+{{< tab "Python" >}}
```python
# Set up the Flink streaming environment and table environment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
-# Setup flink agnets execution environment
+# Setup flink agents execution environment
agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env,
t_env=t_env)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+// Set up the Flink streaming environment and table environment
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+
+// Setup flink agents execution environment
+AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env, tableEnv);
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
Integrate the agent with input `Table`, and return the output `Table` can be
consumed by downstream.
+{{< tabs "From/To Table" >}}
+
+{{< tab "Python" >}}
```python
+input_table = t_env.from_elements(...)
+
output_type = ExternalTypeInfo(RowTypeInfo(
[BasicTypeInfo.INT_TYPE_INFO()],
["result"],
@@ -82,8 +146,33 @@ output_table = (
.to_table(schema=schema, output_type=output_type)
)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+Table inputTable = tableEnv.fromValues(...);
+
+// Here the output schema should always be a nested row, of which
+// the f0 column is the expected row.
+Schema outputSchema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.ROW(DataTypes.FIELD("result",
DataTypes.DOUBLE())))
+ .build();
+
+Table outputTable =
+ agentsEnv
+ .fromTable(
+ inputTable,
+ myKeySelector)
+ .apply(agent)
+ .toTable(outputSchema);
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
User should provide `KeySelector` in `from_table()` to tell how to convert the
input `Table` to `KeyedStream` internally. And provide `Schema` and
`TypeInfomation` in `to_table()` to tell the output `Table` schema.
-{{< hint warning >}}
-__Note:__ Currently, user should provide both `Schema` and `TypeInformation`
when call `to_table()`, we will support only provide one of them in the future.
+{{< hint info >}}
+Currently, user should provide both `Schema` and `TypeInformation` when call
`to_table()`, we will support only provide one of them in the future.
{{< /hint >}}
\ No newline at end of file
diff --git a/docs/content/docs/operations/configuration.md
b/docs/content/docs/operations/configuration.md
index 550c16c..16e454d 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -42,6 +42,9 @@ For resources like `ChatModel`, `EmbeddingModel`, and
`VectorStore`, Flink Agent
#### Example: Defining a Math-Focused Chat Model
+{{< tabs "Example: Defining a Math-Focused Chat Model" >}}
+
+{{< tab "Python" >}}
```python
class MyAgent(Agent):
"""Example agent demonstrating the new ChatModel architecture."""
@@ -67,6 +70,33 @@ class MyAgent(Agent):
extract_reasoning=True # Enable reasoning extraction
)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+public class MyAgent extends Agent {
+ @ChatModelConnection
+ public static ResourceDescriptor ollamaConnection() {
+ return
ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName())
+ .addInitialArgument("endpoint", "http://localhost:11434")
+ .addInitialArgument("requestTimeout", 120)
+ .build();
+ }
+
+ @ChatModelSetup
+ public static ResourceDescriptor mathChatModel() {
+ return
ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+ .addInitialArgument("connection", "ollamaConnection")
+ .addInitialArgument("model", OLLAMA_MODEL)
+ .addInitialArgument("tools", List.of("add"))
+ .addInitialArgument("extractReasoning", true)
+ .build();
+ }
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
### Setting via the AgentsExecutionEnvironment
diff --git a/docs/content/docs/operations/deployment.md
b/docs/content/docs/operations/deployment.md
index 71c7841..f4a0748 100644
--- a/docs/content/docs/operations/deployment.md
+++ b/docs/content/docs/operations/deployment.md
@@ -123,6 +123,9 @@ Follow the [instructions]({{< ref
"docs/get-started/installation" >}}) to instal
### Submit to Flink Cluster
+{{< tabs "Submit to Flink Cluster" >}}
+
+{{< tab "Python" >}}
Submitting Flink Agent jobs to the Flink Cluster is the same as submitting
PyFlink jobs. For more details on all available options, please refer to the
[Flink CLI
documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs).
```bash
@@ -130,5 +133,23 @@ Submitting Flink Agent jobs to the Flink Cluster is the
same as submitting PyFli
--jobmanager <FLINK_CLUSTER_ADDR> \
--python <PATH_TO_YOUR_FLINK_AGENTS_JOB>
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+Submitting Flink Agent jobs to the Flink Cluster is the same as submitting
Flink jobs. For more details on all available options, please refer to the
[Flink CLI
documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-a-job).
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -c <MAIN_CLASS> \
+ <PATH_TO_YOUR_FLINK_AGENTS_JOB_JAR>
+```
+{{< hint warning >}}
+Currently, to resolve the classloader issue, user should place the Flink
Agents job jar to Flink lib directory before start the Flink cluster.
+{{< /hint >}}
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
Now you should see a Flink job submitted to the Flink Cluster in Flink web UI
(typically accessible at http://<jobmanagerHost>:8081).
diff --git a/docs/content/docs/operations/monitoring.md
b/docs/content/docs/operations/monitoring.md
index 19e8afb..db1c44a 100644
--- a/docs/content/docs/operations/monitoring.md
+++ b/docs/content/docs/operations/monitoring.md
@@ -45,6 +45,9 @@ In Flink Agents, users implement their logic by defining
custom Actions that res
Here is the user case example:
+{{< tabs "Custom Metrics" >}}
+
+{{< tab "Python" >}}
```python
class MyAgent(Agent):
@action(InputEvent)
@@ -67,6 +70,35 @@ class MyAgent(Agent):
action_metrics.get_histogram("actionLatencyMs") \
.update(int(time.time_ns() - start_time) // 1000000)
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+```java
+public class MyAgent extends Agent {
+
+ @Action(listenEvents = {InputEvent.class})
+ public static void firstAction(InputEvent event, RunnerContext ctx) throws
Exception {
+ long startTime = System.currentTimeMillis();
+
+ // the action logic
+ ...
+
+ FlinkAgentsMetricGroup metrics = ctx.getAgentMetricGroup();
+
+ metrics.getCounter("numInputEvent").inc();
+ metrics.getMeter("numInputEventPerSec").markEvent();
+
+ FlinkAgentsMetricGroup actionMetrics = ctx.getActionMetricGroup();
+ actionMetrics
+ .getHistogram("actionLatencyMs")
+ .update(System.currentTimeMillis() - startTime);
+ }
+}
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
### How to check the metrics with Flink executor
@@ -82,7 +114,7 @@ The Flink Agents' log system uses Flink's logging framework.
For more details, p
### How to add log in Flink Agents
-For adding logs in Java code, you can refer to [Best practices for
developers](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/logging/#best-practices-for-developers).
In Python, you can add logs using `logging`. Here is a specific example:
+For adding logs in Java code, you can refer to [Flink
documentation](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/logging/#best-practices-for-developers).
In Python, you can add logs using `logging`. Here is a specific example:
```python
@action(InputEvent)