[ 
https://issues.apache.org/jira/browse/FLINK-39611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

featzhang updated FLINK-39611:
------------------------------
    Description: 
  h2. Motivation

  The Triton inference integration (introduced in FLINK-38857) supports stateful
  models via the {{sequence-id}} configuration. However, when a Flink job fails
  over, restarts, or runs with parallelism > 1, all subtasks reuse the same
  static {{sequence-id}}. For *non-reentrant* / stateful models served by 
Triton,
  this causes:

  * Duplicate inference requests sharing the same sequence ID after failover,
    which the server may reject or mis-handle.
  * Sequence batching on the Triton side cannot isolate parallel Flink subtasks.
  * Stateful models retain context tied to a sequence ID that Flink has
    unintentionally "reused", producing incorrect results.

  We need a mechanism to generate unique, monotonically increasing sequence IDs
  per subtask and per request, while remaining isolated across job restarts.

  h2. Proposal

  Introduce a new {{ConfigOption}} {{sequence-id-auto-increment}} in
  {{TritonOptions}}. When enabled, {{TritonInferenceModelFunction}} generates
  sequence IDs in the following format:

  {code}
  {base-sequence-id}-{subtask-index}-{counter}
  # e.g. flink-job-123-0-0, flink-job-123-0-1, flink-job-123-1-0
  {code}

  Implementation details:

  * Add the {{sequence-id-auto-increment}} boolean option (default {{false}}) to
    {{TritonOptions}}.
  * Initialize an {{AtomicLong}} counter and cache the subtask index in
    {{AbstractTritonModelFunction#open()}} so subclasses can reuse the state.
  * In {{TritonInferenceModelFunction}}, when auto-increment is enabled, build
    the sequence ID from the base value, subtask index and counter, incrementing
    the counter for each inference request.
  * Validate at construction time that {{sequence-id-auto-increment=true}}
    requires {{sequence-id}} to be set; otherwise fail fast with a clear error.
  * Add debug-level logging for the generated sequence IDs.
  * Regenerate the auto-generated config docs
    ({{model_triton_advanced_section.html}}, {{triton_configuration.html}}) so
    {{ConfigOptionsDocsCompletenessITCase}} passes.

  h2. Example

  {code:sql}
  CREATE MODEL my_triton_model WITH (
    'provider' = 'triton',
    'endpoint' = 'https://triton-server:8000/v2/models',
    'model-name' = 'my_stateful_model',
    'sequence-id' = 'flink-job-123',
    'sequence-id-auto-increment' = 'true',
    'sequence-start' = 'true',
    'sequence-end' = 'true'
  );
  {code}

  h2. Guarantees

  * *Uniqueness across parallel subtasks* — via {{subtask-index}}.
  * *Monotonicity per subtask* — via the {{AtomicLong}} counter.
  * *Isolation across job restarts* — counter starts from 0 on every fresh
    {{open()}}.

  h2. Scope

  This change is fully optional and isolated under 
{{flink-models/flink-model-triton}}.
  It does not affect existing Flink functionality and is gated by a new config
  option that defaults to {{false}}.

h2. Implementation

  Pull Request: [apache/flink#27562|https://github.com/apache/flink/pull/27562] 
—
  {{[FLINK-38857][model] Add sequence ID auto-increment support for Triton 
inference}}

  was:

  h2. Motivation

  The Triton inference integration (introduced in FLINK-38857) supports stateful
  models via the {{sequence-id}} configuration. However, when a Flink job fails
  over, restarts, or runs with parallelism > 1, all subtasks reuse the same
  static {{sequence-id}}. For *non-reentrant* / stateful models served by 
Triton,
  this causes:

  * Duplicate inference requests sharing the same sequence ID after failover,
    which the server may reject or mis-handle.
  * Sequence batching on the Triton side cannot isolate parallel Flink subtasks.
  * Stateful models retain context tied to a sequence ID that Flink has
    unintentionally "reused", producing incorrect results.

  We need a mechanism to generate unique, monotonically increasing sequence IDs
  per subtask and per request, while remaining isolated across job restarts.

  h2. Proposal

  Introduce a new {{ConfigOption}} {{sequence-id-auto-increment}} in
  {{TritonOptions}}. When enabled, {{TritonInferenceModelFunction}} generates
  sequence IDs in the following format:

  {code}
  {base-sequence-id}-{subtask-index}-{counter}
  # e.g. flink-job-123-0-0, flink-job-123-0-1, flink-job-123-1-0
  {code}

  Implementation details:

  * Add the {{sequence-id-auto-increment}} boolean option (default {{false}}) to
    {{TritonOptions}}.
  * Initialize an {{AtomicLong}} counter and cache the subtask index in
    {{AbstractTritonModelFunction#open()}} so subclasses can reuse the state.
  * In {{TritonInferenceModelFunction}}, when auto-increment is enabled, build
    the sequence ID from the base value, subtask index and counter, incrementing
    the counter for each inference request.
  * Validate at construction time that {{sequence-id-auto-increment=true}}
    requires {{sequence-id}} to be set; otherwise fail fast with a clear error.
  * Add debug-level logging for the generated sequence IDs.
  * Regenerate the auto-generated config docs
    ({{model_triton_advanced_section.html}}, {{triton_configuration.html}}) so
    {{ConfigOptionsDocsCompletenessITCase}} passes.

  h2. Example

  {code:sql}
  CREATE MODEL my_triton_model WITH (
    'provider' = 'triton',
    'endpoint' = 'https://triton-server:8000/v2/models',
    'model-name' = 'my_stateful_model',
    'sequence-id' = 'flink-job-123',
    'sequence-id-auto-increment' = 'true',
    'sequence-start' = 'true',
    'sequence-end' = 'true'
  );
  {code}

  h2. Guarantees

  * *Uniqueness across parallel subtasks* — via {{subtask-index}}.
  * *Monotonicity per subtask* — via the {{AtomicLong}} counter.
  * *Isolation across job restarts* — counter starts from 0 on every fresh
    {{open()}}.

  h2. Scope

  This change is fully optional and isolated under 
{{flink-models/flink-model-triton}}.
  It does not affect existing Flink functionality and is gated by a new config
  option that defaults to {{false}}.


> [Model] Add sequence ID auto-increment support for Triton inference
> -------------------------------------------------------------------
>
>                 Key: FLINK-39611
>                 URL: https://issues.apache.org/jira/browse/FLINK-39611
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / Core
>            Reporter: featzhang
>            Priority: Major
>             Fix For: 2.2.0
>
>
>   h2. Motivation
>   The Triton inference integration (introduced in FLINK-38857) supports 
> stateful
>   models via the {{sequence-id}} configuration. However, when a Flink job 
> fails
>   over, restarts, or runs with parallelism > 1, all subtasks reuse the same
>   static {{sequence-id}}. For *non-reentrant* / stateful models served by 
> Triton,
>   this causes:
>   * Duplicate inference requests sharing the same sequence ID after failover,
>     which the server may reject or mis-handle.
>   * Sequence batching on the Triton side cannot isolate parallel Flink 
> subtasks.
>   * Stateful models retain context tied to a sequence ID that Flink has
>     unintentionally "reused", producing incorrect results.
>   We need a mechanism to generate unique, monotonically increasing sequence 
> IDs
>   per subtask and per request, while remaining isolated across job restarts.
>   h2. Proposal
>   Introduce a new {{ConfigOption}} {{sequence-id-auto-increment}} in
>   {{TritonOptions}}. When enabled, {{TritonInferenceModelFunction}} generates
>   sequence IDs in the following format:
>   {code}
>   {base-sequence-id}-{subtask-index}-{counter}
>   # e.g. flink-job-123-0-0, flink-job-123-0-1, flink-job-123-1-0
>   {code}
>   Implementation details:
>   * Add the {{sequence-id-auto-increment}} boolean option (default {{false}}) 
> to
>     {{TritonOptions}}.
>   * Initialize an {{AtomicLong}} counter and cache the subtask index in
>     {{AbstractTritonModelFunction#open()}} so subclasses can reuse the state.
>   * In {{TritonInferenceModelFunction}}, when auto-increment is enabled, build
>     the sequence ID from the base value, subtask index and counter, 
> incrementing
>     the counter for each inference request.
>   * Validate at construction time that {{sequence-id-auto-increment=true}}
>     requires {{sequence-id}} to be set; otherwise fail fast with a clear 
> error.
>   * Add debug-level logging for the generated sequence IDs.
>   * Regenerate the auto-generated config docs
>     ({{model_triton_advanced_section.html}}, {{triton_configuration.html}}) so
>     {{ConfigOptionsDocsCompletenessITCase}} passes.
>   h2. Example
>   {code:sql}
>   CREATE MODEL my_triton_model WITH (
>     'provider' = 'triton',
>     'endpoint' = 'https://triton-server:8000/v2/models',
>     'model-name' = 'my_stateful_model',
>     'sequence-id' = 'flink-job-123',
>     'sequence-id-auto-increment' = 'true',
>     'sequence-start' = 'true',
>     'sequence-end' = 'true'
>   );
>   {code}
>   h2. Guarantees
>   * *Uniqueness across parallel subtasks* — via {{subtask-index}}.
>   * *Monotonicity per subtask* — via the {{AtomicLong}} counter.
>   * *Isolation across job restarts* — counter starts from 0 on every fresh
>     {{open()}}.
>   h2. Scope
>   This change is fully optional and isolated under 
> {{flink-models/flink-model-triton}}.
>   It does not affect existing Flink functionality and is gated by a new config
>   option that defaults to {{false}}.
> h2. Implementation
>   Pull Request: 
> [apache/flink#27562|https://github.com/apache/flink/pull/27562] —
>   {{[FLINK-38857][model] Add sequence ID auto-increment support for Triton 
> inference}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to