[
https://issues.apache.org/jira/browse/FLINK-39611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
featzhang updated FLINK-39611:
------------------------------
Description:
h2. Motivation
The Triton inference integration (introduced in FLINK-38857) supports stateful
models via the {{sequence-id}} configuration. However, when a Flink job fails
over, restarts, or runs with parallelism > 1, all subtasks reuse the same
static {{sequence-id}}. For *non-reentrant* / stateful models served by
Triton,
this causes:
* Duplicate inference requests sharing the same sequence ID after failover,
which the server may reject or mis-handle.
* Sequence batching on the Triton side cannot isolate parallel Flink subtasks.
* Stateful models retain context tied to a sequence ID that Flink has
unintentionally "reused", producing incorrect results.
We need a mechanism to generate unique, monotonically increasing sequence IDs
per subtask and per request, while remaining isolated across job restarts.
h2. Proposal
Introduce a new {{ConfigOption}} {{sequence-id-auto-increment}} in
{{TritonOptions}}. When enabled, {{TritonInferenceModelFunction}} generates
sequence IDs in the following format:
{code}
{base-sequence-id}-{subtask-index}-{counter}
# e.g. flink-job-123-0-0, flink-job-123-0-1, flink-job-123-1-0
{code}
Implementation details:
* Add the {{sequence-id-auto-increment}} boolean option (default {{false}}) to
{{TritonOptions}}.
* Initialize an {{AtomicLong}} counter and cache the subtask index in
{{AbstractTritonModelFunction#open()}} so subclasses can reuse the state.
* In {{TritonInferenceModelFunction}}, when auto-increment is enabled, build
the sequence ID from the base value, subtask index and counter, incrementing
the counter for each inference request.
* Validate at construction time that {{sequence-id-auto-increment=true}}
requires {{sequence-id}} to be set; otherwise fail fast with a clear error.
* Add debug-level logging for the generated sequence IDs.
* Regenerate the auto-generated config docs
({{model_triton_advanced_section.html}}, {{triton_configuration.html}}) so
{{ConfigOptionsDocsCompletenessITCase}} passes.
h2. Example
{code:sql}
CREATE MODEL my_triton_model WITH (
'provider' = 'triton',
'endpoint' = 'https://triton-server:8000/v2/models',
'model-name' = 'my_stateful_model',
'sequence-id' = 'flink-job-123',
'sequence-id-auto-increment' = 'true',
'sequence-start' = 'true',
'sequence-end' = 'true'
);
{code}
h2. Guarantees
* *Uniqueness across parallel subtasks* — via {{subtask-index}}.
* *Monotonicity per subtask* — via the {{AtomicLong}} counter.
* *Isolation across job restarts* — counter starts from 0 on every fresh
{{open()}}.
h2. Scope
This change is fully optional and isolated under
{{flink-models/flink-model-triton}}.
It does not affect existing Flink functionality and is gated by a new config
option that defaults to {{false}}.
h2. Implementation
Pull Request: [apache/flink#27562|https://github.com/apache/flink/pull/27562]
—
{{[FLINK-38857][model] Add sequence ID auto-increment support for Triton
inference}}
was:
h2. Motivation
The Triton inference integration (introduced in FLINK-38857) supports stateful
models via the {{sequence-id}} configuration. However, when a Flink job fails
over, restarts, or runs with parallelism > 1, all subtasks reuse the same
static {{sequence-id}}. For *non-reentrant* / stateful models served by
Triton,
this causes:
* Duplicate inference requests sharing the same sequence ID after failover,
which the server may reject or mis-handle.
* Sequence batching on the Triton side cannot isolate parallel Flink subtasks.
* Stateful models retain context tied to a sequence ID that Flink has
unintentionally "reused", producing incorrect results.
We need a mechanism to generate unique, monotonically increasing sequence IDs
per subtask and per request, while remaining isolated across job restarts.
h2. Proposal
Introduce a new {{ConfigOption}} {{sequence-id-auto-increment}} in
{{TritonOptions}}. When enabled, {{TritonInferenceModelFunction}} generates
sequence IDs in the following format:
{code}
{base-sequence-id}-{subtask-index}-{counter}
# e.g. flink-job-123-0-0, flink-job-123-0-1, flink-job-123-1-0
{code}
Implementation details:
* Add the {{sequence-id-auto-increment}} boolean option (default {{false}}) to
{{TritonOptions}}.
* Initialize an {{AtomicLong}} counter and cache the subtask index in
{{AbstractTritonModelFunction#open()}} so subclasses can reuse the state.
* In {{TritonInferenceModelFunction}}, when auto-increment is enabled, build
the sequence ID from the base value, subtask index and counter, incrementing
the counter for each inference request.
* Validate at construction time that {{sequence-id-auto-increment=true}}
requires {{sequence-id}} to be set; otherwise fail fast with a clear error.
* Add debug-level logging for the generated sequence IDs.
* Regenerate the auto-generated config docs
({{model_triton_advanced_section.html}}, {{triton_configuration.html}}) so
{{ConfigOptionsDocsCompletenessITCase}} passes.
h2. Example
{code:sql}
CREATE MODEL my_triton_model WITH (
'provider' = 'triton',
'endpoint' = 'https://triton-server:8000/v2/models',
'model-name' = 'my_stateful_model',
'sequence-id' = 'flink-job-123',
'sequence-id-auto-increment' = 'true',
'sequence-start' = 'true',
'sequence-end' = 'true'
);
{code}
h2. Guarantees
* *Uniqueness across parallel subtasks* — via {{subtask-index}}.
* *Monotonicity per subtask* — via the {{AtomicLong}} counter.
* *Isolation across job restarts* — counter starts from 0 on every fresh
{{open()}}.
h2. Scope
This change is fully optional and isolated under
{{flink-models/flink-model-triton}}.
It does not affect existing Flink functionality and is gated by a new config
option that defaults to {{false}}.
> [Model] Add sequence ID auto-increment support for Triton inference
> -------------------------------------------------------------------
>
> Key: FLINK-39611
> URL: https://issues.apache.org/jira/browse/FLINK-39611
> Project: Flink
> Issue Type: Sub-task
> Components: API / Core
> Reporter: featzhang
> Priority: Major
> Fix For: 2.2.0
>
>
> h2. Motivation
> The Triton inference integration (introduced in FLINK-38857) supports
> stateful
> models via the {{sequence-id}} configuration. However, when a Flink job
> fails
> over, restarts, or runs with parallelism > 1, all subtasks reuse the same
> static {{sequence-id}}. For *non-reentrant* / stateful models served by
> Triton,
> this causes:
> * Duplicate inference requests sharing the same sequence ID after failover,
> which the server may reject or mis-handle.
> * Sequence batching on the Triton side cannot isolate parallel Flink
> subtasks.
> * Stateful models retain context tied to a sequence ID that Flink has
> unintentionally "reused", producing incorrect results.
> We need a mechanism to generate unique, monotonically increasing sequence
> IDs
> per subtask and per request, while remaining isolated across job restarts.
> h2. Proposal
> Introduce a new {{ConfigOption}} {{sequence-id-auto-increment}} in
> {{TritonOptions}}. When enabled, {{TritonInferenceModelFunction}} generates
> sequence IDs in the following format:
> {code}
> {base-sequence-id}-{subtask-index}-{counter}
> # e.g. flink-job-123-0-0, flink-job-123-0-1, flink-job-123-1-0
> {code}
> Implementation details:
> * Add the {{sequence-id-auto-increment}} boolean option (default {{false}})
> to
> {{TritonOptions}}.
> * Initialize an {{AtomicLong}} counter and cache the subtask index in
> {{AbstractTritonModelFunction#open()}} so subclasses can reuse the state.
> * In {{TritonInferenceModelFunction}}, when auto-increment is enabled, build
> the sequence ID from the base value, subtask index and counter,
> incrementing
> the counter for each inference request.
> * Validate at construction time that {{sequence-id-auto-increment=true}}
> requires {{sequence-id}} to be set; otherwise fail fast with a clear
> error.
> * Add debug-level logging for the generated sequence IDs.
> * Regenerate the auto-generated config docs
> ({{model_triton_advanced_section.html}}, {{triton_configuration.html}}) so
> {{ConfigOptionsDocsCompletenessITCase}} passes.
> h2. Example
> {code:sql}
> CREATE MODEL my_triton_model WITH (
> 'provider' = 'triton',
> 'endpoint' = 'https://triton-server:8000/v2/models',
> 'model-name' = 'my_stateful_model',
> 'sequence-id' = 'flink-job-123',
> 'sequence-id-auto-increment' = 'true',
> 'sequence-start' = 'true',
> 'sequence-end' = 'true'
> );
> {code}
> h2. Guarantees
> * *Uniqueness across parallel subtasks* — via {{subtask-index}}.
> * *Monotonicity per subtask* — via the {{AtomicLong}} counter.
> * *Isolation across job restarts* — counter starts from 0 on every fresh
> {{open()}}.
> h2. Scope
> This change is fully optional and isolated under
> {{flink-models/flink-model-triton}}.
> It does not affect existing Flink functionality and is gated by a new config
> option that defaults to {{false}}.
> h2. Implementation
> Pull Request:
> [apache/flink#27562|https://github.com/apache/flink/pull/27562] —
> {{[FLINK-38857][model] Add sequence ID auto-increment support for Triton
> inference}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)