featzhang created FLINK-39611:
---------------------------------

             Summary: [Model] Add sequence ID auto-increment support for Triton 
inference
                 Key: FLINK-39611
                 URL: https://issues.apache.org/jira/browse/FLINK-39611
             Project: Flink
          Issue Type: Sub-task
          Components: API / Core
            Reporter: featzhang
             Fix For: 2.2.0



  h2. Motivation

  The Triton inference integration (introduced in FLINK-38857) supports stateful
  models via the {{sequence-id}} configuration. However, when a Flink job fails
  over, restarts, or runs with parallelism > 1, all subtasks reuse the same
  static {{sequence-id}}. For *non-reentrant* / stateful models served by 
Triton,
  this causes:

  * Duplicate inference requests sharing the same sequence ID after failover,
    which the server may reject or mis-handle.
  * Sequence batching on the Triton side cannot isolate parallel Flink subtasks.
  * Stateful models retain context tied to a sequence ID that Flink has
    unintentionally "reused", producing incorrect results.

  We need a mechanism to generate unique, monotonically increasing sequence IDs
  per subtask and per request, while remaining isolated across job restarts.

  h2. Proposal

  Introduce a new {{ConfigOption}} {{sequence-id-auto-increment}} in
  {{TritonOptions}}. When enabled, {{TritonInferenceModelFunction}} generates
  sequence IDs in the following format:

  {code}
  {base-sequence-id}-{subtask-index}-{counter}
  # e.g. flink-job-123-0-0, flink-job-123-0-1, flink-job-123-1-0
  {code}

  Implementation details:

  * Add the {{sequence-id-auto-increment}} boolean option (default {{false}}) to
    {{TritonOptions}}.
  * Initialize an {{AtomicLong}} counter and cache the subtask index in
    {{AbstractTritonModelFunction#open()}} so subclasses can reuse the state.
  * In {{TritonInferenceModelFunction}}, when auto-increment is enabled, build
    the sequence ID from the base value, subtask index and counter, incrementing
    the counter for each inference request.
  * Validate at construction time that {{sequence-id-auto-increment=true}}
    requires {{sequence-id}} to be set; otherwise fail fast with a clear error.
  * Add debug-level logging for the generated sequence IDs.
  * Regenerate the auto-generated config docs
    ({{model_triton_advanced_section.html}}, {{triton_configuration.html}}) so
    {{ConfigOptionsDocsCompletenessITCase}} passes.

  h2. Example

  {code:sql}
  CREATE MODEL my_triton_model WITH (
    'provider' = 'triton',
    'endpoint' = 'https://triton-server:8000/v2/models',
    'model-name' = 'my_stateful_model',
    'sequence-id' = 'flink-job-123',
    'sequence-id-auto-increment' = 'true',
    'sequence-start' = 'true',
    'sequence-end' = 'true'
  );
  {code}

  h2. Guarantees

  * *Uniqueness across parallel subtasks* — via {{subtask-index}}.
  * *Monotonicity per subtask* — via the {{AtomicLong}} counter.
  * *Isolation across job restarts* — counter starts from 0 on every fresh
    {{open()}}.

  h2. Scope

  This change is fully optional and isolated under 
{{flink-models/flink-model-triton}}.
  It does not affect existing Flink functionality and is gated by a new config
  option that defaults to {{false}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to