This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.2 by this push:
new 434c80050e9 [FLINK-38686][doc] Add model table api documentation
(#27243) (#27246)
434c80050e9 is described below
commit 434c80050e94bcbf8c902527896bb6bb7ee1baec
Author: Hao Li <[email protected]>
AuthorDate: Tue Nov 18 00:33:32 2025 -0800
[FLINK-38686][doc] Add model table api documentation (#27243) (#27246)
---
docs/content.zh/docs/dev/table/tableApi.md | 108 ++++++++++++++++++++++++++++
docs/content/docs/dev/table/tableApi.md | 109 +++++++++++++++++++++++++++++
2 files changed, 217 insertions(+)
diff --git a/docs/content.zh/docs/dev/table/tableApi.md
b/docs/content.zh/docs/dev/table/tableApi.md
index da78bf0fe74..d46f01bef79 100644
--- a/docs/content.zh/docs/dev/table/tableApi.md
+++ b/docs/content.zh/docs/dev/table/tableApi.md
@@ -2735,6 +2735,114 @@ result = t.select(col('a'), col('c')) \
{{< query_state_warning_zh >}}
+### 模型推理
+
+{{< label Streaming >}}
+
+Table API 支持模型推理操作,允许你将机器学习模型直接集成到数据处理管道中。你可以使用特定的提供者创建模型,并使用它们对数据进行推理。
+
+#### 创建和使用模型
+
+使用 `ModelDescriptor` 创建模型,它指定提供者、输入/输出 schema 以及配置选项。创建后,你可以使用该模型对表进行预测。
+
+{{< tabs "model-inference" >}}
+{{< tab "Java" >}}
+
+```java
+// 1. 设置本地环境
+EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+// 2. 从内存数据创建源表
+Table myTable = tEnv.fromValues(
+ ROW(FIELD("text", STRING())),
+ row("Hello"),
+ row("Machine Learning"),
+ row("Good morning")
+);
+
+// 3. 创建模型
+tEnv.createModel(
+ "my_model",
+ ModelDescriptor.forProvider("openai")
+ .inputSchema(Schema.newBuilder().column("input", STRING()).build())
+ .outputSchema(Schema.newBuilder().column("output", STRING()).build())
+ .option("endpoint", "https://api.openai.com/v1/chat/completions")
+ .option("model", "gpt-4.1")
+ .option("system-prompt", "translate to chinese")
+ .option("api-key", "<your-openai-api-key-here>")
+ .build()
+);
+
+Model model = tEnv.fromModel("my_model");
+
+// 4. 使用模型把文本翻译成中文
+Table predictResult = model.predict(myTable, ColumnList.of("text"));
+
+// 5. 异步预测示例
+Table asyncPredictResult = model.predict(
+ myTable,
+ ColumnList.of("text"),
+ Map.of("async", "true")
+);
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+// 1. 设置本地环境
+val settings = EnvironmentSettings.inStreamingMode()
+val tEnv = TableEnvironment.create(settings)
+
+// 2. 从内存数据创建源表
+val myTable: Table = tEnv.fromValues(
+ ROW(FIELD("text", STRING())),
+ row("Hello"),
+ row("Machine Learning"),
+ row("Good morning")
+)
+
+// 3. 创建模型
+tEnv.createModel(
+ "my_model",
+ ModelDescriptor.forProvider("openai")
+ .inputSchema(Schema.newBuilder().column("input", STRING()).build())
+ .outputSchema(Schema.newBuilder().column("output", STRING()).build())
+ .option("endpoint", "https://api.openai.com/v1/chat/completions")
+ .option("model", "gpt-4.1")
+ .option("system-prompt", "translate to chinese")
+ .option("api-key", "<your-openai-api-key-here>")
+ .build()
+)
+
+val model = tEnv.fromModel("my_model")
+
+// 4. 使用模型把文本翻译成中文
+val predictResult = model.predict(myTable, ColumnList.of("text"))
+
+// 5. 异步预测示例
+val asyncPredictResult = model.predict(
+ myTable,
+ ColumnList.of("text"),
+ Map("async" -> "true").asJava
+)
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# 目前 Python Table API 尚不支持
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+模型推理操作支持同步和异步预测模式 (需要底层接口 `ModelProvider`支持)。默认情况下,
Planner使用异步预测。这可以通过允许并发请求来提高高延迟模型的吞吐量。
+
+{{< top >}}
+
<a name="data-types"></a>
数据类型
----------
diff --git a/docs/content/docs/dev/table/tableApi.md
b/docs/content/docs/dev/table/tableApi.md
index d19cdcfc021..4b28838a3f2 100644
--- a/docs/content/docs/dev/table/tableApi.md
+++ b/docs/content/docs/dev/table/tableApi.md
@@ -2735,6 +2735,115 @@ result = t.select(col('a'), col('c')) \
{{< query_state_warning >}}
+### Model Inference
+
+{{< label Streaming >}}
+
+The Table API supports model inference operations that allow you to integrate
machine learning models directly into your data processing pipelines. You can
create models with specific providers and use them to make inference on your
data.
+
+#### Creating and Using Models
+
+Models are created using `ModelDescriptor` which specifies the provider,
input/output schemas, and configuration options. Once created, you can use the
model to make predictions on tables.
+
+{{< tabs "model-inference" >}}
+{{< tab "Java" >}}
+
+```java
+// 1. Set up the local environment
+EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
+TableEnvironment tEnv = TableEnvironment.create(settings);
+
+// 2. Create a source table from in-memory data
+Table myTable = tEnv.fromValues(
+ ROW(FIELD("text", STRING())),
+ row("Hello"),
+ row("Machine Learning"),
+ row("Good morning")
+);
+
+// 3. Create model
+tEnv.createModel(
+ "my_model",
+ ModelDescriptor.forProvider("openai")
+ .inputSchema(Schema.newBuilder().column("input", STRING()).build())
+ .outputSchema(Schema.newBuilder().column("output", STRING()).build())
+ .option("endpoint", "https://api.openai.com/v1/chat/completions")
+ .option("model", "gpt-4.1")
+ .option("system-prompt", "translate text to Chinese")
+ .option("api-key", "<your-openai-api-key-here>")
+ .build()
+);
+
+Model model = tEnv.fromModel("my_model");
+
+// 4. Use the model to translate text to Chinese
+Table predictResult = model.predict(myTable, ColumnList.of("text"));
+
+// 5. Async prediction example
+Table asyncPredictResult = model.predict(
+ myTable,
+ ColumnList.of("text"),
+ Map.of("async", "true")
+);
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+
+```scala
+// 1. Set up the local environment
+val settings = EnvironmentSettings.inStreamingMode()
+val tEnv = TableEnvironment.create(settings)
+
+// 2. Create a source table from in-memory data
+val myTable: Table = tEnv.fromValues(
+ ROW(FIELD("text", STRING())),
+ row("Hello"),
+ row("Machine Learning"),
+ row("Good morning")
+)
+
+// 3. Create model
+tEnv.createModel(
+ "my_model",
+ ModelDescriptor.forProvider("openai")
+ .inputSchema(Schema.newBuilder().column("input", STRING()).build())
+ .outputSchema(Schema.newBuilder().column("output", STRING()).build())
+ .option("endpoint", "https://api.openai.com/v1/chat/completions")
+ .option("model", "gpt-4.1")
+ .option("system-prompt", "translate to chinese")
+ .option("api-key", "<your-openai-api-key-here>")
+ .build()
+)
+
+val model = tEnv.fromModel("my_model")
+
+// 4. Use the model to translate text to Chinese
+val predictResult = model.predict(myTable, ColumnList.of("text"))
+
+// 5. Async prediction example
+val asyncPredictResult = model.predict(
+ myTable,
+ ColumnList.of("text"),
+ Map("async" -> "true").asJava
+)
+```
+
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# Not yet supported in Python Table API
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+Model inference supports both synchronous and asynchronous prediction modes
(when supported by the underlying `ModelProvider` interface).
+By default, the planner uses asynchronous mode to maximize throughput for
high-latency models by processing multiple requests concurrently.
+
+{{< top >}}
+
Data Types
----------