featzhang created FLINK-39611:
---------------------------------
Summary: [Model] Add sequence ID auto-increment support for Triton
inference
Key: FLINK-39611
URL: https://issues.apache.org/jira/browse/FLINK-39611
Project: Flink
Issue Type: Sub-task
Components: API / Core
Reporter: featzhang
Fix For: 2.2.0
h2. Motivation
The Triton inference integration (introduced in FLINK-38857) supports stateful
models via the {{sequence-id}} configuration. However, when a Flink job fails
over, restarts, or runs with parallelism > 1, all subtasks reuse the same
static {{sequence-id}}. For *non-reentrant* / stateful models served by
Triton,
this causes:
* Duplicate inference requests sharing the same sequence ID after failover,
which the server may reject or mis-handle.
* Sequence batching on the Triton side cannot isolate parallel Flink subtasks.
* Stateful models retain context tied to a sequence ID that Flink has
unintentionally "reused", producing incorrect results.
We need a mechanism to generate unique, monotonically increasing sequence IDs
per subtask and per request, while remaining isolated across job restarts.
h2. Proposal
Introduce a new {{ConfigOption}} {{sequence-id-auto-increment}} in
{{TritonOptions}}. When enabled, {{TritonInferenceModelFunction}} generates
sequence IDs in the following format:
{code}
{base-sequence-id}-{subtask-index}-{counter}
# e.g. flink-job-123-0-0, flink-job-123-0-1, flink-job-123-1-0
{code}
Implementation details:
* Add the {{sequence-id-auto-increment}} boolean option (default {{false}}) to
{{TritonOptions}}.
* Initialize an {{AtomicLong}} counter and cache the subtask index in
{{AbstractTritonModelFunction#open()}} so subclasses can reuse the state.
* In {{TritonInferenceModelFunction}}, when auto-increment is enabled, build
the sequence ID from the base value, subtask index and counter, incrementing
the counter for each inference request.
* Validate at construction time that {{sequence-id-auto-increment=true}}
requires {{sequence-id}} to be set; otherwise fail fast with a clear error.
* Add debug-level logging for the generated sequence IDs.
* Regenerate the auto-generated config docs
({{model_triton_advanced_section.html}}, {{triton_configuration.html}}) so
{{ConfigOptionsDocsCompletenessITCase}} passes.
h2. Example
{code:sql}
CREATE MODEL my_triton_model WITH (
'provider' = 'triton',
'endpoint' = 'https://triton-server:8000/v2/models',
'model-name' = 'my_stateful_model',
'sequence-id' = 'flink-job-123',
'sequence-id-auto-increment' = 'true',
'sequence-start' = 'true',
'sequence-end' = 'true'
);
{code}
h2. Guarantees
* *Uniqueness across parallel subtasks* — via {{subtask-index}}.
* *Monotonicity per subtask* — via the {{AtomicLong}} counter.
* *Isolation across job restarts* — counter starts from 0 on every fresh
{{open()}}.
h2. Scope
This change is fully optional and isolated under
{{flink-models/flink-model-triton}}.
It does not affect existing Flink functionality and is gated by a new config
option that defaults to {{false}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)