This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8833af43fd5 [FLINK-38857][Model] Add docs for Triton inference model
(#27490)
8833af43fd5 is described below
commit 8833af43fd5a9ad476ff4cfb5af73162218b3b58
Author: Feat Zhang <[email protected]>
AuthorDate: Mon Mar 9 10:41:19 2026 +0800
[FLINK-38857][Model] Add docs for Triton inference model (#27490)
---
docs/content.zh/docs/connectors/models/triton.md | 776 +++++++++++++++++++++
docs/content.zh/docs/sql/reference/ddl/create.md | 16 +
.../docs/sql/reference/queries/model-inference.md | 9 +-
docs/content/docs/connectors/models/triton.md | 708 +++++++++++++++++++
docs/content/docs/sql/reference/ddl/create.md | 15 +
.../docs/sql/reference/queries/model-inference.md | 7 +
.../generated/model_triton_advanced_section.html | 60 ++
.../generated/model_triton_common_section.html | 36 +
.../shortcodes/generated/triton_configuration.html | 84 +++
.../flink/annotation/docs/Documentation.java | 4 +
.../apache/flink/model/triton/TritonOptions.java | 30 +-
11 files changed, 1737 insertions(+), 8 deletions(-)
diff --git a/docs/content.zh/docs/connectors/models/triton.md
b/docs/content.zh/docs/connectors/models/triton.md
new file mode 100644
index 00000000000..3b8bdf38a87
--- /dev/null
+++ b/docs/content.zh/docs/connectors/models/triton.md
@@ -0,0 +1,776 @@
+---
+title: "Triton"
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Triton
+
+<a name="overview"></a>
+
+## 概述
+
+Triton 模型函数允许 Flink SQL 调用 [NVIDIA Triton
推理服务器](https://github.com/triton-inference-server/server)进行实时模型推理任务。Triton
推理服务器是一个高性能推理服务解决方案,支持多种机器学习框架,包括 TensorFlow、PyTorch、ONNX、TensorRT 等。
+
+主要特性:
+* **高性能**:针对低延迟和高吞吐量推理进行了优化
+* **多框架支持**:支持来自各种 ML 框架的模型
+* **异步处理**:非阻塞推理请求,实现更好的资源利用
+* **灵活配置**:针对不同用例的全面配置选项
+* **资源管理**:高效的 HTTP 客户端池和自动资源清理
+* **容错能力**:内置重试机制,可配置重试次数
+
+{{< hint info >}}
+`flink-model-triton` 模块自 Flink 2.3 起可用。请确保你可以访问正在运行的 Triton 推理服务器实例。
+{{< /hint >}}
+
+<a name="usage-examples"></a>
+
+## 使用示例
+
+<a name="example-1-text-classification-basic"></a>
+
+### 示例 1:文本分类(基础)
+
+此示例演示对电影评论进行情感分析:
+
+{{< tabs "text-classification" >}}
+{{< tab "SQL" >}}
+```sql
+-- 创建 Triton 模型
+CREATE MODEL triton_sentiment_classifier
+INPUT (`input` STRING)
+OUTPUT (`output` STRING)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://localhost:8000/v2/models',
+ 'model-name' = 'text-classification',
+ 'model-version' = '1',
+ 'timeout' = '10000'
+);
+
+-- 准备源数据
+CREATE TEMPORARY VIEW movie_reviews(id, movie_name, user_review,
actual_sentiment)
+AS VALUES
+ (1, 'Great Movie', 'This movie was absolutely fantastic! Great acting and
storyline.', 'positive'),
+ (2, 'Boring Film', 'I fell asleep halfway through. Very disappointing.',
'negative'),
+ (3, 'Average Show', 'It was okay, nothing special but not terrible either.',
'neutral');
+
+-- 创建输出表
+CREATE TEMPORARY TABLE classified_reviews(
+ id BIGINT,
+ movie_name VARCHAR,
+ predicted_sentiment VARCHAR,
+ actual_sentiment VARCHAR
+) WITH (
+ 'connector' = 'print'
+);
+
+-- 分类情感
+INSERT INTO classified_reviews
+SELECT id, movie_name, output as predicted_sentiment, actual_sentiment
+FROM ML_PREDICT(
+ TABLE movie_reviews,
+ MODEL triton_sentiment_classifier,
+ DESCRIPTOR(user_review)
+);
+```
+{{< /tab >}}
+{{< tab "Table API (Java)" >}}
+```java
+TableEnvironment tEnv = TableEnvironment.create(...);
+
+// 注册模型
+tEnv.executeSql(
+ "CREATE MODEL triton_sentiment_classifier " +
+ "INPUT (`input` STRING) " +
+ "OUTPUT (`output` STRING) " +
+ "WITH (" +
+ " 'provider' = 'triton', " +
+ " 'endpoint' = 'http://localhost:8000/v2/models', " +
+ " 'model-name' = 'text-classification', " +
+ " 'model-version' = '1', " +
+ " 'timeout' = '10000'" +
+ ")"
+);
+
+// 注册源表
+tEnv.executeSql(
+ "CREATE TEMPORARY VIEW movie_reviews(id, movie_name, user_review,
actual_sentiment) " +
+ "AS VALUES " +
+ " (1, 'Great Movie', 'This movie was absolutely fantastic!', 'positive'),
" +
+ " (2, 'Boring Film', 'I fell asleep halfway through.', 'negative')"
+);
+
+// 执行分类
+Table result = tEnv.sqlQuery(
+ "SELECT id, movie_name, output as predicted_sentiment " +
+ "FROM ML_PREDICT(" +
+ " TABLE movie_reviews, " +
+ " MODEL triton_sentiment_classifier, " +
+ " DESCRIPTOR(user_review)" +
+ ")"
+);
+
+result.execute().print();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+<a name="example-2-streaming-image-classification"></a>
+
+### 示例 2:流式图像分类
+
+使用 ResNet 模型对来自 Kafka 流的图像进行分类:
+
+```sql
+-- 注册图像分类模型
+CREATE MODEL image_classifier
+INPUT (image_pixels ARRAY<FLOAT>)
+OUTPUT (predicted_class STRING, confidence FLOAT)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'resnet50',
+ 'model-version' = '1',
+ 'timeout' = '10000',
+ 'compression' = 'gzip' -- 为大型图像数据启用压缩
+);
+
+-- 来自 Kafka 的源表
+CREATE TEMPORARY TABLE image_stream (
+ image_id STRING,
+ image_pixels ARRAY<FLOAT>, -- 预处理后的图像作为浮点数组
+ upload_time TIMESTAMP(3),
+ WATERMARK FOR upload_time AS upload_time - INTERVAL '5' SECOND
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'images',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- 分类图像
+SELECT
+ image_id,
+ predicted_class,
+ confidence,
+ upload_time
+FROM ML_PREDICT(
+ TABLE image_stream,
+ MODEL image_classifier,
+ DESCRIPTOR(image_pixels)
+);
+```
+
+<a name="example-3-real-time-fraud-detection"></a>
+
+### 示例 3:实时欺诈检测
+
+使用高优先级推理进行欺诈检测:
+
+```sql
+-- 创建高优先级的欺诈检测模型
+CREATE MODEL fraud_detector
+INPUT (
+ user_id BIGINT,
+ amount DOUBLE,
+ merchant_id STRING,
+ device_fingerprint STRING
+)
+OUTPUT (fraud_score FLOAT)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'fraud_detection_model',
+ 'timeout' = '5000',
+ 'priority' = '200' -- 关键交易的高优先级
+);
+
+CREATE TEMPORARY TABLE transactions (
+ transaction_id STRING,
+ user_id BIGINT,
+ amount DECIMAL(10, 2),
+ merchant_id STRING,
+ device_fingerprint STRING,
+ transaction_time TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'transactions',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- 标记可疑交易
+SELECT
+ transaction_id,
+ user_id,
+ amount,
+ fraud_score,
+ CASE
+ WHEN fraud_score > 0.8 THEN 'HIGH_RISK'
+ WHEN fraud_score > 0.5 THEN 'MEDIUM_RISK'
+ ELSE 'LOW_RISK'
+ END AS risk_level
+FROM ML_PREDICT(
+ TABLE transactions,
+ MODEL fraud_detector,
+ DESCRIPTOR(user_id, amount, merchant_id, device_fingerprint)
+)
+WHERE fraud_score > 0.5; -- 对可疑交易发出警报
+```
+
+<a name="example-4-recommendation-system"></a>
+
+### 示例 4:推荐系统
+
+基于用户行为的产品推荐:
+
+```sql
+-- 注册推荐模型
+CREATE MODEL recommender
+INPUT (
+ user_features ARRAY<FLOAT>,
+ browsing_history ARRAY<STRING>,
+ context_features ARRAY<FLOAT>
+)
+OUTPUT (recommended_products ARRAY<STRING>, scores ARRAY<FLOAT>)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'product_recommender',
+ 'model-version' = '2'
+);
+
+CREATE TEMPORARY TABLE user_activity (
+ user_id BIGINT,
+ user_features ARRAY<FLOAT>,
+ browsing_history ARRAY<STRING>,
+ context_features ARRAY<FLOAT>,
+ event_time TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'user_events',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- 生成个性化推荐
+SELECT
+ user_id,
+ recommended_products,
+ scores,
+ event_time
+FROM ML_PREDICT(
+ TABLE user_activity,
+ MODEL recommender,
+ DESCRIPTOR(user_features, browsing_history, context_features)
+);
+```
+
+<a name="example-5-named-entity-recognition-ner"></a>
+
+### 示例 5:命名实体识别(NER)
+
+从文本文档中提取实体:
+
+```sql
+-- 注册 NER 模型,为大型文档启用压缩
+CREATE MODEL ner_model
+INPUT (document_text STRING)
+OUTPUT (entities ARRAY<STRING>, entity_types ARRAY<STRING>)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'bert_ner',
+ 'compression' = 'gzip'
+);
+
+CREATE TEMPORARY TABLE documents (
+ doc_id STRING,
+ document_text STRING,
+ source STRING,
+ created_time TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'documents',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- 提取命名实体
+SELECT
+ doc_id,
+ entities,
+ entity_types,
+ source
+FROM ML_PREDICT(
+ TABLE documents,
+ MODEL ner_model,
+ DESCRIPTOR(document_text)
+);
+```
+
+<a name="example-6-stateful-sequence-model"></a>
+
+### 示例 6:有状态序列模型
+
+使用带序列跟踪的有状态模型(RNN/LSTM):
+
+```sql
+-- 注册有状态对话模型
+CREATE MODEL conversation_model
+INPUT (message_text STRING)
+OUTPUT (bot_response STRING)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'chatbot_lstm',
+ 'sequence-id' = 'conv-001', -- 唯一序列 ID
+ 'sequence-start' = 'true',
+ 'sequence-end' = 'false'
+);
+
+CREATE TEMPORARY TABLE chat_messages (
+ message_id STRING,
+ user_id BIGINT,
+ message_text STRING,
+ timestamp TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'chat',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- 处理带上下文的对话
+SELECT
+ message_id,
+ user_id,
+ bot_response,
+ timestamp
+FROM ML_PREDICT(
+ TABLE chat_messages,
+ MODEL conversation_model,
+ DESCRIPTOR(message_text)
+);
+```
+
+<a name="example-7-batch-inference"></a>
+
+### 示例 7:批量推理
+
+对历史数据执行批量推理:
+
+```sql
+-- 注册批处理模型
+CREATE MODEL batch_classifier
+INPUT (features ARRAY<DOUBLE>)
+OUTPUT (prediction STRING, confidence DOUBLE)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'classifier',
+ 'timeout' = '60000'
+);
+
+-- 批量源表
+CREATE TEMPORARY TABLE historical_data (
+ id BIGINT,
+ features ARRAY<DOUBLE>
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = 'hdfs:///data/historical',
+ 'format' = 'parquet'
+);
+
+-- 批量推理,结果写入接收器
+CREATE TEMPORARY TABLE classification_results (
+ id BIGINT,
+ prediction STRING,
+ confidence DOUBLE
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = 'hdfs:///results/classifications',
+ 'format' = 'parquet'
+);
+
+INSERT INTO classification_results
+SELECT id, prediction, confidence
+FROM ML_PREDICT(
+ TABLE historical_data,
+ MODEL batch_classifier,
+ DESCRIPTOR(features)
+);
+```
+
+<a name="example-8-secure-triton-server"></a>
+
+### 示例 8:安全的 Triton 服务器
+
+访问带身份验证的安全 Triton 服务器:
+
+```sql
+-- 注册带身份验证的模型
+CREATE MODEL secure_model
+INPUT (data STRING)
+OUTPUT (result STRING)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'https://secure-triton:8000/v2/models',
+ 'model-name' = 'private_model',
+ 'auth-token' = 'Bearer your-token-here',
+ 'custom-headers' = '{"X-API-Key": "your-api-key", "X-Client-ID":
"flink-job-123"}'
+);
+
+SELECT id, result
+FROM ML_PREDICT(
+ TABLE sensitive_data,
+ MODEL secure_model,
+ DESCRIPTOR(data)
+);
+```
+
+{{< hint warning >}}
+切勿在 SQL 中硬编码敏感令牌。使用 Flink 的密钥管理或环境变量。
+{{< /hint >}}
+
+<a name="example-9-array-types-flatten-batch-dimension"></a>
+
+### 示例 9:数组类型与展平批次维度
+
+对于接受数组输入但不需要批次维度的模型:
+
+```sql
+-- 创建带数组输入和展平批次维度的模型
+CREATE MODEL triton_vector_model
+INPUT (input_vector ARRAY<FLOAT>)
+OUTPUT (output_vector ARRAY<FLOAT>)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://localhost:8000/v2/models',
+ 'model-name' = 'vector-transform',
+ 'model-version' = '1',
+ 'flatten-batch-dim' = 'true' -- 将 [1,N] 展平为 [N]
+);
+
+-- 使用模型进行推理
+CREATE TEMPORARY TABLE vector_input (
+ id BIGINT,
+ features ARRAY<FLOAT>
+) WITH (
+ 'connector' = 'datagen',
+ 'fields.features.length' = '128' -- 128 维向量
+);
+
+SELECT id, output_vector
+FROM ML_PREDICT(
+ TABLE vector_input,
+ MODEL triton_vector_model,
+ DESCRIPTOR(features)
+);
+```
+
+<a name="example-10-advanced-configuration"></a>
+
+### 示例 10:高级配置
+
+用于生产环境的综合设置:
+
+```sql
+CREATE MODEL triton_advanced_model
+INPUT (`input` STRING)
+OUTPUT (`output` STRING)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'https://triton.example.com/v2/models',
+ 'model-name' = 'advanced-nlp-model',
+ 'model-version' = 'latest',
+ 'timeout' = '15000',
+ 'priority' = '100',
+ 'auth-token' = 'Bearer your-auth-token-here',
+ 'custom-headers' = '{"X-Custom-Header": "custom-value", "X-Request-ID":
"req-123"}',
+ 'compression' = 'gzip'
+);
+```
+
+<a name="model-options"></a>
+
+## 模型选项
+
+<a name="required-options"></a>
+
+### 必需选项
+
+{{< generated/model_triton_common_section >}}
+
+<a name="optional-options"></a>
+
+### 可选选项
+
+{{< generated/model_triton_advanced_section >}}
+
+<a name="schema-requirement"></a>
+
+## Schema 要求
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-center">输入类型</th>
+ <th class="text-center">输出类型</th>
+ <th class="text-left">说明</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>BOOLEAN, TINYINT, SMALLINT, INT, BIGINT</td>
+ <td>BOOLEAN, TINYINT, SMALLINT, INT, BIGINT</td>
+ <td>整数类型推理</td>
+ </tr>
+ <tr>
+ <td>FLOAT, DOUBLE</td>
+ <td>FLOAT, DOUBLE</td>
+ <td>浮点类型推理</td>
+ </tr>
+ <tr>
+ <td>STRING</td>
+ <td>STRING</td>
+ <td>文本到文本推理(分类、生成等)</td>
+ </tr>
+ <tr>
+ <td>ARRAY<numeric types></td>
+ <td>ARRAY<numeric types></td>
+ <td>数组推理(向量、张量等)。支持数值类型的数组。</td>
+ </tr>
+ </tbody>
+</table>
+
+**注意**:输入和输出类型必须与 Triton 模型配置中定义的类型匹配。
+
+要验证模型的预期输入/输出类型,请查询 Triton 服务器:
+```bash
+curl http://triton-server:8000/v2/models/{model_name}/config
+```
+
+<a name="triton-server-setup"></a>
+
+## Triton 服务器设置
+
+要使用此集成,你需要一个正在运行的 Triton 推理服务器。以下是基本设置指南:
+
+<a name="using-docker"></a>
+
+### 使用 Docker {#using-docker}
+
+```bash
+# 拉取 Triton 服务器镜像
+docker pull nvcr.io/nvidia/tritonserver:23.10-py3
+
+# 使用你的模型存储库运行 Triton 服务器
+docker run --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 \
+ -v /path/to/your/model/repository:/models \
+ nvcr.io/nvidia/tritonserver:23.10-py3 \
+ tritonserver --model-repository=/models
+```
+
+<a name="model-repository-structure"></a>
+
+### 模型存储库结构
+
+你的模型存储库应遵循以下结构:
+
+```
+model_repository/
+├── text-classification/
+│ ├── config.pbtxt
+│ └── 1/
+│ └── model.py # 或 model.onnx、model.plan 等
+└── other-model/
+ ├── config.pbtxt
+ └── 1/
+ └── model.savedmodel/
+```
+
+<a name="example-model-configuration"></a>
+
+### 示例模型配置
+
+以下是文本分类模型的 `config.pbtxt` 示例:
+
+```protobuf
+name: "text-classification"
+platform: "python"
+max_batch_size: 8
+input [
+ {
+ name: "INPUT_TEXT"
+ data_type: TYPE_STRING
+ dims: [ 1 ]
+ }
+]
+output [
+ {
+ name: "OUTPUT_TEXT"
+ data_type: TYPE_STRING
+ dims: [ 1 ]
+ }
+]
+```
+
+<a name="performance-considerations"></a>
+
+## 性能注意事项
+
+1. **连接池**:HTTP 客户端被池化和重用以提高效率
+2. **异步处理**:非阻塞请求防止线程饥饿
+3. **批处理**:配置批大小以实现最佳吞吐量
+ - 简单模型:batch-size 1-4
+ - 中等模型:batch-size 4-16
+ - 复杂模型:batch-size 16-32
+4. **资源管理**:HTTP 资源的自动清理
+5. **超时配置**:根据模型复杂度设置适当的超时值
+ - 简单模型:1-5 秒
+ - 中等模型(如 BERT):5-30 秒
+ - 复杂模型(如 GPT):30-120 秒
+6. **重试策略**:配置重试次数以处理瞬态故障
+7. **压缩**:对于 > 1KB 的负载启用 gzip 压缩
+8. **并行度**:将 Flink 并行度与 Triton 服务器容量匹配
+
+<a name="best-practices"></a>
+
+## 最佳实践
+
+<a name="model-version-management"></a>
+
+### 模型版本管理
+
+在生产环境中固定模型版本以确保一致性:
+```sql
+'model-version' = '3' -- 固定到版本 3 而不是 'latest'
+```
+
+<a name="error-handling-best-practices"></a>
+
+### 异常处理
+
+失败时使用默认值:
+```sql
+SELECT COALESCE(output, 'UNKNOWN') AS prediction
+FROM ML_PREDICT(...)
+```
+
+<a name="resource-configuration"></a>
+
+### 资源配置
+
+为高吞吐量场景配置足够的内存和网络缓冲区:
+```yaml
+taskmanager.memory.managed.size: 2gb
+taskmanager.network.memory.fraction: 0.2
+```
+
+<a name="error-handling"></a>
+
+## 异常处理
+
+该集成包括全面的异常处理:
+
+- **连接错误**:使用指数退避自动重试
+- **超时处理**:可配置的请求超时
+- **HTTP 错误**:来自 Triton 服务器的详细错误消息
+- **序列化错误**:JSON 解析和验证错误
+
+<a name="monitoring-and-debugging"></a>
+
+## 监控和调试
+
+启用调试日志以监控集成:
+
+```properties
+# 在 log4j2.properties 中
+logger.triton.name = org.apache.flink.model.triton
+logger.triton.level = DEBUG
+```
+
+这将提供有关以下内容的详细日志:
+- HTTP 请求/响应详细信息
+- 客户端连接管理
+- 错误条件和重试
+- 性能指标
+
+<a name="troubleshooting"></a>
+
+## 故障排查
+
+<a name="connection-issues"></a>
+
+### 连接问题
+- 验证 Triton 服务器正在运行:`curl http://triton-server:8000/v2/health/ready`
+- 检查网络连接和防火墙规则
+- 确保端点 URL 包含正确的协议(http/https)
+
+<a name="timeout-errors"></a>
+
+### 超时错误
+- 增加超时值:`'timeout' = '60000'`
+- 检查 Triton 服务器资源使用情况(CPU/GPU)
+- 监控 Triton 服务器日志以查找慢速模型执行
+
+<a name="type-mismatch"></a>
+
+### 类型不匹配
+- 验证模型 schema:`curl http://triton-server:8000/v2/models/{model}/config`
+- 显式转换 Flink 类型:`CAST(value AS FLOAT)`
+- 确保数组维度与模型期望匹配
+
+<a name="high-latency"></a>
+
+### 高延迟
+- 启用请求压缩:`'compression' = 'gzip'`
+- 增加 Triton 服务器实例
+- 在 Triton 服务器配置中使用动态批处理
+- 检查 Flink 和 Triton 之间的网络延迟
+
+<a name="dependencies"></a>
+
+## 依赖项
+
+要使用 Triton 模型函数,你需要在 Flink 应用程序中包含以下依赖项:
+
+```xml
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-model-triton</artifactId>
+ <version>${flink.version}</version>
+</dependency>
+```
+
+<a name="further-information"></a>
+
+## 更多信息
+
+- [Triton
推理服务器文档](https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/)
+- [Triton
模型配置](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/model_configuration.md)
+- [Flink 异步 I/O]({{< ref "docs/dev/datastream/operators/asyncio" >}})
+- [Flink 指标]({{< ref "docs/ops/metrics" >}})
+
+{{< top >}}
diff --git a/docs/content.zh/docs/sql/reference/ddl/create.md
b/docs/content.zh/docs/sql/reference/ddl/create.md
index 1ca4e1c94d5..e5314fec71e 100644
--- a/docs/content.zh/docs/sql/reference/ddl/create.md
+++ b/docs/content.zh/docs/sql/reference/ddl/create.md
@@ -969,4 +969,20 @@ WITH (
);
```
+```sql
+CREATE MODEL triton_text_classifier
+INPUT (input STRING COMMENT '用于分类的输入文本')
+OUTPUT (output STRING COMMENT '分类结果')
+COMMENT '基于 Triton 的文本分类模型'
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://localhost:8000/v2/models',
+ 'model-name' = 'text-classification',
+ 'headers.Authorization' = 'Bearer ${TRITON_TOKEN}',
+ 'model-version' = '1',
+ 'timeout' = '10000',
+ 'max-retries' = '3'
+);
+```
+
{{< top >}}
diff --git a/docs/content.zh/docs/sql/reference/queries/model-inference.md
b/docs/content.zh/docs/sql/reference/queries/model-inference.md
index 96b4623ecef..547f7f47fc0 100644
--- a/docs/content.zh/docs/sql/reference/queries/model-inference.md
+++ b/docs/content.zh/docs/sql/reference/queries/model-inference.md
@@ -112,7 +112,7 @@ SELECT * FROM ML_PREDICT(
如果配置中未设置 `async`,系统将选择同步或异步模型提供者,如果两者都存在,则优先使用异步模型提供者。
-### 错误处理
+### 异常处理
在以下情况下,函数将抛出异常:
- 模型中不存在于目录中
@@ -131,4 +131,11 @@ SELECT * FROM ML_PREDICT(
- [模型创建]({{< ref "docs/sql/reference/ddl/create#create-model" >}})
- [模型修改]({{< ref "docs/sql/reference/ddl/alter#alter-model" >}})
+### 支持的模型提供者
+
+Flink 目前支持以下模型提供者:
+
+- **OpenAI**:用于调用 OpenAI API 服务。详情请参见 [OpenAI 模型文档]({{< ref
"docs/connectors/models/openai" >}})。
+- **Triton**:用于调用 NVIDIA Triton 推理服务器。详情请参见 [Triton 模型文档]({{< ref
"docs/connectors/models/triton" >}})。
+
{{< top >}}
diff --git a/docs/content/docs/connectors/models/triton.md
b/docs/content/docs/connectors/models/triton.md
new file mode 100644
index 00000000000..238c4f7646b
--- /dev/null
+++ b/docs/content/docs/connectors/models/triton.md
@@ -0,0 +1,708 @@
+---
+title: "Triton"
+weight: 2
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Triton
+
+## Overview
+
+The Triton Model Function allows Flink SQL to call [NVIDIA Triton Inference
Server](https://github.com/triton-inference-server/server) for real-time model
inference tasks. Triton Inference Server is a high-performance inference
serving solution that supports multiple machine learning frameworks including
TensorFlow, PyTorch, ONNX, TensorRT, and more.
+
+Key features:
+* **High Performance**: Optimized for low-latency and high-throughput inference
+* **Multi-Framework Support**: Works with models from various ML frameworks
+* **Asynchronous Processing**: Non-blocking inference requests for better
resource utilization
+* **Flexible Configuration**: Comprehensive configuration options for
different use cases
+* **Resource Management**: Efficient HTTP client pooling and automatic
resource cleanup
+* **Fault Tolerance**: Built-in retry mechanism with configurable attempts
+
+{{< hint info >}}
+The `flink-model-triton` module is available since Flink 2.3. Ensure you have
access to a running Triton Inference Server instance.
+{{< /hint >}}
+
+## Usage Examples
+
+### Example 1: Text Classification (Basic)
+
+This example demonstrates sentiment analysis on movie reviews:
+
+{{< tabs "text-classification" >}}
+{{< tab "SQL" >}}
+```sql
+-- Create the Triton model
+CREATE MODEL triton_sentiment_classifier
+INPUT (`input` STRING)
+OUTPUT (`output` STRING)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://localhost:8000/v2/models',
+ 'model-name' = 'text-classification',
+ 'model-version' = '1',
+ 'timeout' = '10000'
+);
+
+-- Prepare source data
+CREATE TEMPORARY VIEW movie_reviews(id, movie_name, user_review,
actual_sentiment)
+AS VALUES
+ (1, 'Great Movie', 'This movie was absolutely fantastic! Great acting and
storyline.', 'positive'),
+ (2, 'Boring Film', 'I fell asleep halfway through. Very disappointing.',
'negative'),
+ (3, 'Average Show', 'It was okay, nothing special but not terrible either.',
'neutral');
+
+-- Create output table
+CREATE TEMPORARY TABLE classified_reviews(
+ id BIGINT,
+ movie_name VARCHAR,
+ predicted_sentiment VARCHAR,
+ actual_sentiment VARCHAR
+) WITH (
+ 'connector' = 'print'
+);
+
+-- Classify sentiment
+INSERT INTO classified_reviews
+SELECT id, movie_name, output as predicted_sentiment, actual_sentiment
+FROM ML_PREDICT(
+ TABLE movie_reviews,
+ MODEL triton_sentiment_classifier,
+ DESCRIPTOR(user_review)
+);
+```
+{{< /tab >}}
+{{< tab "Table API (Java)" >}}
+```java
+TableEnvironment tEnv = TableEnvironment.create(...);
+
+// Register the model
+tEnv.executeSql(
+ "CREATE MODEL triton_sentiment_classifier " +
+ "INPUT (`input` STRING) " +
+ "OUTPUT (`output` STRING) " +
+ "WITH (" +
+ " 'provider' = 'triton', " +
+ " 'endpoint' = 'http://localhost:8000/v2/models', " +
+ " 'model-name' = 'text-classification', " +
+ " 'model-version' = '1', " +
+ " 'timeout' = '10000'" +
+ ")"
+);
+
+// Register source table
+tEnv.executeSql(
+ "CREATE TEMPORARY VIEW movie_reviews(id, movie_name, user_review,
actual_sentiment) " +
+ "AS VALUES " +
+ " (1, 'Great Movie', 'This movie was absolutely fantastic!', 'positive'),
" +
+ " (2, 'Boring Film', 'I fell asleep halfway through.', 'negative')"
+);
+
+// Perform classification
+Table result = tEnv.sqlQuery(
+ "SELECT id, movie_name, output as predicted_sentiment " +
+ "FROM ML_PREDICT(" +
+ " TABLE movie_reviews, " +
+ " MODEL triton_sentiment_classifier, " +
+ " DESCRIPTOR(user_review)" +
+ ")"
+);
+
+result.execute().print();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Example 2: Image Classification with Streaming
+
+Classify images from a Kafka stream using a ResNet model:
+
+```sql
+-- Register image classification model
+CREATE MODEL image_classifier
+INPUT (image_pixels ARRAY<FLOAT>)
+OUTPUT (predicted_class STRING, confidence FLOAT)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'resnet50',
+ 'model-version' = '1',
+ 'timeout' = '10000',
+ 'compression' = 'gzip' -- Enable compression for large image data
+);
+
+-- Source table from Kafka
+CREATE TEMPORARY TABLE image_stream (
+ image_id STRING,
+ image_pixels ARRAY<FLOAT>, -- Preprocessed image as float array
+ upload_time TIMESTAMP(3),
+ WATERMARK FOR upload_time AS upload_time - INTERVAL '5' SECOND
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'images',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- Classify images
+SELECT
+ image_id,
+ predicted_class,
+ confidence,
+ upload_time
+FROM ML_PREDICT(
+ TABLE image_stream,
+ MODEL image_classifier,
+ DESCRIPTOR(image_pixels)
+);
+```
+
+### Example 3: Real-time Fraud Detection
+
+High-priority inference for fraud detection:
+
+```sql
+-- Create fraud detection model with high priority
+CREATE MODEL fraud_detector
+INPUT (
+ user_id BIGINT,
+ amount DOUBLE,
+ merchant_id STRING,
+ device_fingerprint STRING
+)
+OUTPUT (fraud_score FLOAT)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'fraud_detection_model',
+ 'timeout' = '5000',
+ 'priority' = '200' -- High priority for critical transactions
+);
+
+CREATE TEMPORARY TABLE transactions (
+ transaction_id STRING,
+ user_id BIGINT,
+ amount DECIMAL(10, 2),
+ merchant_id STRING,
+ device_fingerprint STRING,
+ transaction_time TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'transactions',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- Flag suspicious transactions
+SELECT
+ transaction_id,
+ user_id,
+ amount,
+ fraud_score,
+ CASE
+ WHEN fraud_score > 0.8 THEN 'HIGH_RISK'
+ WHEN fraud_score > 0.5 THEN 'MEDIUM_RISK'
+ ELSE 'LOW_RISK'
+ END AS risk_level
+FROM ML_PREDICT(
+ TABLE transactions,
+ MODEL fraud_detector,
+ DESCRIPTOR(user_id, amount, merchant_id, device_fingerprint)
+)
+WHERE fraud_score > 0.5; -- Alert on suspicious transactions
+```
+
+### Example 4: Recommendation System
+
+Product recommendations based on user behavior:
+
+```sql
+-- Register recommendation model
+CREATE MODEL recommender
+INPUT (
+ user_features ARRAY<FLOAT>,
+ browsing_history ARRAY<STRING>,
+ context_features ARRAY<FLOAT>
+)
+OUTPUT (recommended_products ARRAY<STRING>, scores ARRAY<FLOAT>)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'product_recommender',
+ 'model-version' = '2'
+);
+
+CREATE TEMPORARY TABLE user_activity (
+ user_id BIGINT,
+ user_features ARRAY<FLOAT>,
+ browsing_history ARRAY<STRING>,
+ context_features ARRAY<FLOAT>,
+ event_time TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'user_events',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- Generate personalized recommendations
+SELECT
+ user_id,
+ recommended_products,
+ scores,
+ event_time
+FROM ML_PREDICT(
+ TABLE user_activity,
+ MODEL recommender,
+ DESCRIPTOR(user_features, browsing_history, context_features)
+);
+```
+
+### Example 5: Named Entity Recognition (NER)
+
+Extract entities from text documents:
+
+```sql
+-- Register NER model with compression for large documents
+CREATE MODEL ner_model
+INPUT (document_text STRING)
+OUTPUT (entities ARRAY<STRING>, entity_types ARRAY<STRING>)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'bert_ner',
+ 'compression' = 'gzip'
+);
+
+CREATE TEMPORARY TABLE documents (
+ doc_id STRING,
+ document_text STRING,
+ source STRING,
+ created_time TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'documents',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- Extract named entities
+SELECT
+ doc_id,
+ entities,
+ entity_types,
+ source
+FROM ML_PREDICT(
+ TABLE documents,
+ MODEL ner_model,
+ DESCRIPTOR(document_text)
+);
+```
+
+### Example 6: Stateful Sequence Model
+
+Use stateful models (RNN/LSTM) with sequence tracking:
+
+```sql
+-- Register stateful conversation model
+CREATE MODEL conversation_model
+INPUT (message_text STRING)
+OUTPUT (bot_response STRING)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'chatbot_lstm',
+ 'sequence-id' = 'conv-001', -- Unique sequence ID
+ 'sequence-start' = 'true',
+ 'sequence-end' = 'false'
+);
+
+CREATE TEMPORARY TABLE chat_messages (
+ message_id STRING,
+ user_id BIGINT,
+ message_text STRING,
+ timestamp TIMESTAMP(3)
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'chat',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
+
+-- Process conversation with context
+SELECT
+ message_id,
+ user_id,
+ bot_response,
+ timestamp
+FROM ML_PREDICT(
+ TABLE chat_messages,
+ MODEL conversation_model,
+ DESCRIPTOR(message_text)
+);
+```
+
+### Example 7: Batch Inference
+
+Perform batch inference on historical data:
+
+```sql
+-- Register model for batch processing
+CREATE MODEL batch_classifier
+INPUT (features ARRAY<DOUBLE>)
+OUTPUT (prediction STRING, confidence DOUBLE)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://triton-server:8000/v2/models',
+ 'model-name' = 'classifier',
+ 'timeout' = '60000'
+);
+
+-- Batch source table
+CREATE TEMPORARY TABLE historical_data (
+ id BIGINT,
+ features ARRAY<DOUBLE>
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = 'hdfs:///data/historical',
+ 'format' = 'parquet'
+);
+
+-- Batch inference with results written to sink
+CREATE TEMPORARY TABLE classification_results (
+ id BIGINT,
+ prediction STRING,
+ confidence DOUBLE
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = 'hdfs:///results/classifications',
+ 'format' = 'parquet'
+);
+
+INSERT INTO classification_results
+SELECT id, prediction, confidence
+FROM ML_PREDICT(
+ TABLE historical_data,
+ MODEL batch_classifier,
+ DESCRIPTOR(features)
+);
+```
+
+### Example 8: Secured Triton Server
+
+Access a secured Triton server with authentication:
+
+```sql
+-- Register model with authentication
+CREATE MODEL secure_model
+INPUT (data STRING)
+OUTPUT (result STRING)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'https://secure-triton:8000/v2/models',
+ 'model-name' = 'private_model',
+ 'auth-token' = 'Bearer your-token-here',
+ 'custom-headers' = '{"X-API-Key": "your-api-key", "X-Client-ID":
"flink-job-123"}'
+);
+
+SELECT id, result
+FROM ML_PREDICT(
+ TABLE sensitive_data,
+ MODEL secure_model,
+ DESCRIPTOR(data)
+);
+```
+
+{{< hint warning >}}
+Never hardcode sensitive tokens in SQL. Use Flink's secret management or
environment variables.
+{{< /hint >}}
+
+### Example 9: Array Type with Flatten Batch Dimension
+
+For models that accept array inputs without batch dimension:
+
+```sql
+-- Create model with array input and flatten batch dimension
+CREATE MODEL triton_vector_model
+INPUT (input_vector ARRAY<FLOAT>)
+OUTPUT (output_vector ARRAY<FLOAT>)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://localhost:8000/v2/models',
+ 'model-name' = 'vector-transform',
+ 'model-version' = '1',
+ 'flatten-batch-dim' = 'true' -- Flatten [1,N] to [N]
+);
+
+-- Use the model for inference
+CREATE TEMPORARY TABLE vector_input (
+ id BIGINT,
+ features ARRAY<FLOAT>
+) WITH (
+ 'connector' = 'datagen',
+ 'fields.features.length' = '128' -- 128-dimensional vector
+);
+
+SELECT id, output_vector
+FROM ML_PREDICT(
+ TABLE vector_input,
+ MODEL triton_vector_model,
+ DESCRIPTOR(features)
+);
+```
+
+### Example 10: Advanced Configuration
+
+For production environments with comprehensive settings:
+
+```sql
+CREATE MODEL triton_advanced_model
+INPUT (`input` STRING)
+OUTPUT (`output` STRING)
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'https://triton.example.com/v2/models',
+ 'model-name' = 'advanced-nlp-model',
+ 'model-version' = 'latest',
+ 'timeout' = '15000',
+ 'priority' = '100',
+ 'auth-token' = 'Bearer your-auth-token-here',
+ 'custom-headers' = '{"X-Custom-Header": "custom-value", "X-Request-ID":
"req-123"}',
+ 'compression' = 'gzip'
+);
+```
+
+## Model Options
+
+### Required Options
+
+{{< generated/model_triton_common_section >}}
+
+### Optional Options
+
+{{< generated/model_triton_advanced_section >}}
+
+## Schema Requirement
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-center">Input Type</th>
+ <th class="text-center">Output Type</th>
+ <th class="text-left">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>BOOLEAN, TINYINT, SMALLINT, INT, BIGINT</td>
+ <td>BOOLEAN, TINYINT, SMALLINT, INT, BIGINT</td>
+ <td>Integer type inference</td>
+ </tr>
+ <tr>
+ <td>FLOAT, DOUBLE</td>
+ <td>FLOAT, DOUBLE</td>
+ <td>Floating-point type inference</td>
+ </tr>
+ <tr>
+ <td>STRING</td>
+ <td>STRING</td>
+ <td>Text-to-text inference (classification, generation, etc.)</td>
+ </tr>
+ <tr>
+ <td>ARRAY<numeric types></td>
+ <td>ARRAY<numeric types></td>
+ <td>Array inference (vectors, tensors, etc.). Supports arrays of
numeric types.</td>
+ </tr>
+ </tbody>
+</table>
+
+**Note**: Input and output types must match the types defined in your Triton
model configuration.
+
+To verify your model's expected input/output types, query the Triton server:
+```bash
+curl http://triton-server:8000/v2/models/{model_name}/config
+```
+
+## Triton Server Setup
+
+To use this integration, you need a running Triton Inference Server. Here's a
basic setup guide:
+
+### Using Docker
+
+```bash
+# Pull Triton server image
+docker pull nvcr.io/nvidia/tritonserver:23.10-py3
+
+# Run Triton server with your model repository
+docker run --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 \
+ -v /path/to/your/model/repository:/models \
+ nvcr.io/nvidia/tritonserver:23.10-py3 \
+ tritonserver --model-repository=/models
+```
+
+### Model Repository Structure
+
+Your model repository should follow this structure:
+
+```
+model_repository/
+├── text-classification/
+│ ├── config.pbtxt
+│ └── 1/
+│ └── model.py # or model.onnx, model.plan, etc.
+└── other-model/
+ ├── config.pbtxt
+ └── 1/
+ └── model.savedmodel/
+```
+
+### Example Model Configuration
+
+Here's an example `config.pbtxt` for a text classification model:
+
+```protobuf
+name: "text-classification"
+platform: "python"
+max_batch_size: 8
+input [
+ {
+ name: "INPUT_TEXT"
+ data_type: TYPE_STRING
+ dims: [ 1 ]
+ }
+]
+output [
+ {
+ name: "OUTPUT_TEXT"
+ data_type: TYPE_STRING
+ dims: [ 1 ]
+ }
+]
+```
+
+## Performance Considerations
+
+1. **Connection Pooling**: HTTP clients are pooled and reused for efficiency
+2. **Asynchronous Processing**: Non-blocking requests prevent thread starvation
+3. **Batch Processing**: Configure batch size for optimal throughput
+ - Simple models: batch-size 1-4
+ - Medium models: batch-size 4-16
+ - Complex models: batch-size 16-32
+4. **Resource Management**: Automatic cleanup of HTTP resources
+5. **Timeout Configuration**: Set appropriate timeout values based on model
complexity
+ - Simple models: 1-5 seconds
+ - Medium models (e.g., BERT): 5-30 seconds
+ - Complex models (e.g., GPT): 30-120 seconds
+6. **Retry Strategy**: Configure retry attempts for handling transient failures
+7. **Compression**: Enable gzip compression for payloads > 1KB
+8. **Parallelism**: Match Flink parallelism to Triton server capacity
+
+## Best Practices
+
+### Model Version Management
+
+Pin model versions in production to ensure consistency:
+```sql
+'model-version' = '3' -- Pin to version 3 instead of 'latest'
+```
+
+### Error Handling
+
+Use default values on failure:
+```sql
+SELECT COALESCE(output, 'UNKNOWN') AS prediction
+FROM ML_PREDICT(...)
+```
+
+### Resource Configuration
+
+Configure sufficient memory and network buffers for high-throughput scenarios:
+```yaml
+taskmanager.memory.managed.size: 2gb
+taskmanager.network.memory.fraction: 0.2
+```
+
+## Error Handling
+
+The integration includes comprehensive error handling:
+
+- **Connection Errors**: Automatic retry with exponential backoff
+- **Timeout Handling**: Configurable request timeouts
+- **HTTP Errors**: Detailed error messages from Triton server
+- **Serialization Errors**: JSON parsing and validation errors
+
+## Monitoring and Debugging
+
+Enable debug logging to monitor the integration:
+
+```properties
+# In log4j2.properties
+logger.triton.name = org.apache.flink.model.triton
+logger.triton.level = DEBUG
+```
+
+This will provide detailed logs about:
+- HTTP request/response details
+- Client connection management
+- Error conditions and retries
+- Performance metrics
+
+## Troubleshooting
+
+### Connection Issues
+- Verify Triton server is running: `curl
http://triton-server:8000/v2/health/ready`
+- Check network connectivity and firewall rules
+- Ensure endpoint URL includes the correct protocol (http/https)
+
+### Timeout Errors
+- Increase timeout value: `'timeout' = '60000'`
+- Check Triton server resource usage (CPU/GPU)
+- Monitor Triton server logs for slow model execution
+
+### Type Mismatch
+- Verify model schema: `curl
http://triton-server:8000/v2/models/{model}/config`
+- Cast Flink types explicitly: `CAST(value AS FLOAT)`
+- Ensure array dimensions match model expectations
+
+### High Latency
+- Enable request compression: `'compression' = 'gzip'`
+- Increase Triton server instances
+- Use dynamic batching in Triton server configuration
+- Check network latency between Flink and Triton
+
+## Dependencies
+
+To use the Triton model function, you need to include the following dependency
in your Flink application:
+
+```xml
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-model-triton</artifactId>
+ <version>${flink.version}</version>
+</dependency>
+```
+
+## Further Information
+
+- [Triton Inference Server
Documentation](https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/)
+- [Triton Model
Configuration](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/model_configuration.md)
+- [Flink Async I/O]({{< ref "docs/dev/datastream/operators/asyncio" >}})
+- [Flink Metrics]({{< ref "docs/ops/metrics" >}})
+
+{{< top >}}
diff --git a/docs/content/docs/sql/reference/ddl/create.md
b/docs/content/docs/sql/reference/ddl/create.md
index 06da3881c44..fff1f392b78 100644
--- a/docs/content/docs/sql/reference/ddl/create.md
+++ b/docs/content/docs/sql/reference/ddl/create.md
@@ -952,4 +952,19 @@ WITH (
);
```
+```sql
+CREATE MODEL triton_text_classifier
+INPUT (input STRING COMMENT 'Input text for classification')
+OUTPUT (output STRING COMMENT 'Classification result')
+COMMENT 'A Triton-based text classification model'
+WITH (
+ 'provider' = 'triton',
+ 'endpoint' = 'http://localhost:8000/v2/models',
+ 'model-name' = 'text-classification',
+ 'model-version' = '1',
+ 'timeout' = '10000',
+ 'max-retries' = '3'
+);
+```
+
{{< top >}}
diff --git a/docs/content/docs/sql/reference/queries/model-inference.md
b/docs/content/docs/sql/reference/queries/model-inference.md
index 2a70a8174c0..17e7678cd3d 100644
--- a/docs/content/docs/sql/reference/queries/model-inference.md
+++ b/docs/content/docs/sql/reference/queries/model-inference.md
@@ -131,4 +131,11 @@ The function will throw an exception in the following
cases:
- [Model Creation]({{< ref "docs/sql/reference/ddl/create#create-model" >}})
- [Model Alteration]({{< ref "docs/sql/reference/ddl/alter#alter-model" >}})
+### Supported Model Providers
+
+Flink currently supports the following model providers:
+
+- **OpenAI**: For calling OpenAI API services. See [OpenAI Model
Documentation]({{< ref "docs/connectors/models/openai" >}}) for details.
+- **Triton**: For calling NVIDIA Triton Inference Server. See [Triton Model
Documentation]({{< ref "docs/connectors/models/triton" >}}) for details.
+
{{< top >}}
diff --git
a/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
b/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
new file mode 100644
index 00000000000..20347e0519f
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
@@ -0,0 +1,60 @@
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>auth-token</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Authentication token for secured Triton servers.</td>
+ </tr>
+ <tr>
+ <td><h5>compression</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Compression algorithm for request body. Currently only <code
class="highlighter-rouge">gzip</code> is supported. When enabled, the request
body will be compressed to reduce network bandwidth.</td>
+ </tr>
+ <tr>
+ <td><h5>custom-headers</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Map</td>
+ <td>Custom HTTP headers as key-value pairs. Example: <code
class="highlighter-rouge">'X-Custom-Header:value,X-Another:value2'</code></td>
+ </tr>
+ <tr>
+ <td><h5>flatten-batch-dim</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to flatten the batch dimension for array inputs. When
true, shape [1,N] becomes [N]. Defaults to false.</td>
+ </tr>
+ <tr>
+ <td><h5>priority</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Request priority level (0-255). Higher values indicate higher
priority.</td>
+ </tr>
+ <tr>
+ <td><h5>sequence-end</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether this request marks the end of a sequence for stateful
models. When true, Triton will release the model's state after processing this
request. See <a
href="https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/user_guide/architecture.html#stateful-models">Triton
Stateful Models</a> for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>sequence-id</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Sequence ID for stateful models. A sequence represents a
series of inference requests that must be routed to the same model instance to
maintain state across requests (e.g., for RNN/LSTM models). See <a
href="https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/user_guide/architecture.html#stateful-models">Triton
Stateful Models</a> for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>sequence-start</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether this request marks the start of a new sequence for
stateful models. When true, Triton will initialize the model's state before
processing this request. See <a
href="https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/user_guide/architecture.html#stateful-models">Triton
Stateful Models</a> for more details.</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/docs/layouts/shortcodes/generated/model_triton_common_section.html
b/docs/layouts/shortcodes/generated/model_triton_common_section.html
new file mode 100644
index 00000000000..47edc929a3a
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/model_triton_common_section.html
@@ -0,0 +1,36 @@
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>endpoint</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Full URL of the Triton Inference Server endpoint, e.g., <code
class="highlighter-rouge">https://triton-server:8000/v2/models</code>. Both
HTTP and HTTPS are supported; HTTPS is recommended for production.</td>
+ </tr>
+ <tr>
+ <td><h5>model-name</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the model to invoke on Triton server.</td>
+ </tr>
+ <tr>
+ <td><h5>model-version</h5></td>
+ <td style="word-wrap: break-word;">"latest"</td>
+ <td>String</td>
+ <td>Version of the model to use. Defaults to 'latest'.</td>
+ </tr>
+ <tr>
+ <td><h5>timeout</h5></td>
+ <td style="word-wrap: break-word;">30 s</td>
+ <td>Duration</td>
+ <td>HTTP request timeout (connect + read + write). This applies
per individual request and is separate from Flink's async timeout. Defaults to
30 seconds.</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/docs/layouts/shortcodes/generated/triton_configuration.html
b/docs/layouts/shortcodes/generated/triton_configuration.html
new file mode 100644
index 00000000000..692a5821f24
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/triton_configuration.html
@@ -0,0 +1,84 @@
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>auth-token</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Authentication token for secured Triton servers.</td>
+ </tr>
+ <tr>
+ <td><h5>compression</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Compression algorithm for request body. Currently only <code
class="highlighter-rouge">gzip</code> is supported. When enabled, the request
body will be compressed to reduce network bandwidth.</td>
+ </tr>
+ <tr>
+ <td><h5>custom-headers</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Map</td>
+ <td>Custom HTTP headers as key-value pairs. Example: <code
class="highlighter-rouge">'X-Custom-Header:value,X-Another:value2'</code></td>
+ </tr>
+ <tr>
+ <td><h5>endpoint</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Full URL of the Triton Inference Server endpoint, e.g., <code
class="highlighter-rouge">https://triton-server:8000/v2/models</code>. Both
HTTP and HTTPS are supported; HTTPS is recommended for production.</td>
+ </tr>
+ <tr>
+ <td><h5>flatten-batch-dim</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to flatten the batch dimension for array inputs. When
true, shape [1,N] becomes [N]. Defaults to false.</td>
+ </tr>
+ <tr>
+ <td><h5>model-name</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the model to invoke on Triton server.</td>
+ </tr>
+ <tr>
+ <td><h5>model-version</h5></td>
+ <td style="word-wrap: break-word;">"latest"</td>
+ <td>String</td>
+ <td>Version of the model to use. Defaults to 'latest'.</td>
+ </tr>
+ <tr>
+ <td><h5>priority</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Request priority level (0-255). Higher values indicate higher
priority.</td>
+ </tr>
+ <tr>
+ <td><h5>sequence-end</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether this request marks the end of a sequence for stateful
models. When true, Triton will release the model's state after processing this
request. See <a
href="https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/user_guide/architecture.html#stateful-models">Triton
Stateful Models</a> for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>sequence-id</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Sequence ID for stateful models. A sequence represents a
series of inference requests that must be routed to the same model instance to
maintain state across requests (e.g., for RNN/LSTM models). See <a
href="https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/user_guide/architecture.html#stateful-models">Triton
Stateful Models</a> for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>sequence-start</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether this request marks the start of a new sequence for
stateful models. When true, Triton will initialize the model's state before
processing this request. See <a
href="https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/user_guide/architecture.html#stateful-models">Triton
Stateful Models</a> for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>timeout</h5></td>
+ <td style="word-wrap: break-word;">30 s</td>
+ <td>Duration</td>
+ <td>HTTP request timeout (connect + read + write). This applies
per individual request and is separate from Flink's async timeout. Defaults to
30 seconds.</td>
+ </tr>
+ </tbody>
+</table>
diff --git
a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index 8b298eac038..ef44de3411b 100644
---
a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++
b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -121,6 +121,10 @@ public final class Documentation {
public static final String MODEL_OPENAI_EMBEDDING =
"model_openai_embedding";
+ public static final String MODEL_TRITON_COMMON = "model_triton_common";
+
+ public static final String MODEL_TRITON_ADVANCED =
"model_triton_advanced";
+
private Sections() {}
}
diff --git
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
index f5b920776e4..bf3b7d8967a 100644
---
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
+++
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
@@ -18,6 +18,7 @@
package org.apache.flink.model.triton;
+import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -29,19 +30,17 @@ import java.util.Map;
import static org.apache.flink.configuration.description.TextElement.code;
-/**
- * Configuration options for Triton Inference Server model functions.
- *
- * <p>Documentation for these options will be added in a separate PR.
- */
[email protected](
- "Documentation for Triton options will be added in a separate PR")
+/** Configuration options for Triton Inference Server model functions. */
+@Experimental
public class TritonOptions {
private TritonOptions() {
// Utility class with static options only
}
+ @Documentation.Section(
+ value = {Documentation.Sections.MODEL_TRITON_COMMON},
+ position = 1)
public static final ConfigOption<String> ENDPOINT =
ConfigOptions.key("endpoint")
.stringType()
@@ -54,18 +53,27 @@ public class TritonOptions {
code("https://triton-server:8000/v2/models"))
.build());
+ @Documentation.Section(
+ value = {Documentation.Sections.MODEL_TRITON_COMMON},
+ position = 2)
public static final ConfigOption<String> MODEL_NAME =
ConfigOptions.key("model-name")
.stringType()
.noDefaultValue()
.withDescription("Name of the model to invoke on Triton
server.");
+ @Documentation.Section(
+ value = {Documentation.Sections.MODEL_TRITON_COMMON},
+ position = 3)
public static final ConfigOption<String> MODEL_VERSION =
ConfigOptions.key("model-version")
.stringType()
.defaultValue("latest")
.withDescription("Version of the model to use. Defaults to
'latest'.");
+ @Documentation.Section(
+ value = {Documentation.Sections.MODEL_TRITON_COMMON},
+ position = 4)
public static final ConfigOption<Duration> TIMEOUT =
ConfigOptions.key("timeout")
.durationType()
@@ -75,6 +83,7 @@ public class TritonOptions {
+ "This applies per individual request and
is separate from Flink's async timeout. "
+ "Defaults to 30 seconds.");
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
public static final ConfigOption<Boolean> FLATTEN_BATCH_DIM =
ConfigOptions.key("flatten-batch-dim")
.booleanType()
@@ -83,6 +92,7 @@ public class TritonOptions {
"Whether to flatten the batch dimension for array
inputs. "
+ "When true, shape [1,N] becomes [N].
Defaults to false.");
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
public static final ConfigOption<Integer> PRIORITY =
ConfigOptions.key("priority")
.intType()
@@ -90,6 +100,7 @@ public class TritonOptions {
.withDescription(
"Request priority level (0-255). Higher values
indicate higher priority.");
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
public static final ConfigOption<String> SEQUENCE_ID =
ConfigOptions.key("sequence-id")
.stringType()
@@ -106,6 +117,7 @@ public class TritonOptions {
"Triton Stateful Models"))
.build());
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
public static final ConfigOption<Boolean> SEQUENCE_START =
ConfigOptions.key("sequence-start")
.booleanType()
@@ -121,6 +133,7 @@ public class TritonOptions {
"Triton Stateful Models"))
.build());
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
public static final ConfigOption<Boolean> SEQUENCE_END =
ConfigOptions.key("sequence-end")
.booleanType()
@@ -136,6 +149,7 @@ public class TritonOptions {
"Triton Stateful Models"))
.build());
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
public static final ConfigOption<String> COMPRESSION =
ConfigOptions.key("compression")
.stringType()
@@ -148,12 +162,14 @@ public class TritonOptions {
code("gzip"))
.build());
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
public static final ConfigOption<String> AUTH_TOKEN =
ConfigOptions.key("auth-token")
.stringType()
.noDefaultValue()
.withDescription("Authentication token for secured Triton
servers.");
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
public static final ConfigOption<Map<String, String>> CUSTOM_HEADERS =
ConfigOptions.key("custom-headers")
.mapType()