This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e329586e365 [FLINK-39225][model] Add retry support for triton
inference (#27561)
e329586e365 is described below
commit e329586e365f9da34601832654c5352211d401cf
Author: Feat Zhang <[email protected]>
AuthorDate: Thu May 7 19:51:32 2026 +0800
[FLINK-39225][model] Add retry support for triton inference (#27561)
---
.../generated/model_triton_advanced_section.html | 24 +
.../shortcodes/generated/triton_configuration.html | 24 +
flink-models/flink-model-triton/pom.xml | 11 +
.../model/triton/AbstractTritonModelFunction.java | 129 +++-
.../model/triton/TritonInferenceModelFunction.java | 700 ++++++++++++++++++---
.../model/triton/TritonModelProviderFactory.java | 5 +
.../apache/flink/model/triton/TritonOptions.java | 85 +++
.../triton/TritonBreakerFailureAccountingTest.java | 71 +++
.../triton/TritonDefaultValueFallbackTest.java | 103 +++
.../triton/TritonModelProviderFactoryTest.java | 8 +-
.../flink/model/triton/TritonRetryBackoffTest.java | 171 +++++
.../triton/TritonRetryConfigValidationTest.java | 148 +++++
.../flink/model/triton/TritonRetryOptionsTest.java | 68 ++
13 files changed, 1467 insertions(+), 80 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
b/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
index 794b20941ff..0d14b114b62 100644
--- a/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
+++ b/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
@@ -50,6 +50,12 @@
<td>Map</td>
<td>Custom HTTP headers as key-value pairs. Example: <code
class="highlighter-rouge">'X-Custom-Header:value,X-Another:value2'</code></td>
</tr>
+ <tr>
+ <td><h5>default-value</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Fallback value to return when all retry attempts fail
(transient errors) or when the request fails with a non-retryable error (4xx).
This allows downstream processing to distinguish between successful and failed
predictions without propagating exceptions. Format depends on output type: for
STRING use plain text (e.g. <code class="highlighter-rouge">'FAILED'</code>);
for numeric types use string representation (e.g. <code
class="highlighter-rouge">'-1'</code>); for ARRAY t [...]
+ </tr>
<tr>
<td><h5>flatten-batch-dim</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -68,12 +74,30 @@
<td>Duration</td>
<td>Interval between health check requests. Shorter intervals
provide faster failure detection but increase server load. Defaults to 30
seconds. Only effective when health-check-enabled is true.</td>
</tr>
+ <tr>
+ <td><h5>max-retries</h5></td>
+ <td style="word-wrap: break-word;">0</td>
+ <td>Integer</td>
+ <td>Maximum number of retries (additional attempts beyond the
first) for failed inference requests. With max-retries=2 the request will be
attempted up to 3 times in total (1 initial attempt + 2 retries). When set to 0
(default), no retry is performed. Only transient failures are retried: network
errors and 5xx responses. Client-side 4xx errors, response parsing failures,
and circuit breaker OPEN failures are never retried because they indicate a
persistent condition. Must be [...]
+ </tr>
<tr>
<td><h5>priority</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Request priority level (0-255). Higher values indicate higher
priority.</td>
</tr>
+ <tr>
+ <td><h5>retry-initial-backoff</h5></td>
+ <td style="word-wrap: break-word;">100 ms</td>
+ <td>Duration</td>
+ <td>Initial backoff duration between retry attempts. Uses
exponential backoff with equal jitter: the nominal delay is initial-backoff *
2^attempt (first retry waits this duration, second retry waits 2x, third waits
4x, and so on), clamped to retry-max-backoff, then randomized in the range
[delay/2, delay] to prevent a thundering herd of concurrent retries hitting the
server at the exact same instant. Defaults to 100ms. Must be > 0.</td>
+ </tr>
+ <tr>
+ <td><h5>retry-max-backoff</h5></td>
+ <td style="word-wrap: break-word;">30 s</td>
+ <td>Duration</td>
+ <td>Upper bound on the delay between retry attempts. Exponential
backoff computed from retry-initial-backoff is clamped to this value so that a
misconfigured max-retries cannot produce hours-long sleeps or overflow the
delay computation. Defaults to 30s. Must be >= retry-initial-backoff.</td>
+ </tr>
<tr>
<td><h5>sequence-end</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/docs/layouts/shortcodes/generated/triton_configuration.html
b/docs/layouts/shortcodes/generated/triton_configuration.html
index 2ebfac39733..882d9d3f548 100644
--- a/docs/layouts/shortcodes/generated/triton_configuration.html
+++ b/docs/layouts/shortcodes/generated/triton_configuration.html
@@ -50,6 +50,12 @@
<td>Map</td>
<td>Custom HTTP headers as key-value pairs. Example: <code
class="highlighter-rouge">'X-Custom-Header:value,X-Another:value2'</code></td>
</tr>
+ <tr>
+ <td><h5>default-value</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Fallback value to return when all retry attempts fail
(transient errors) or when the request fails with a non-retryable error (4xx).
This allows downstream processing to distinguish between successful and failed
predictions without propagating exceptions. Format depends on output type: for
STRING use plain text (e.g. <code class="highlighter-rouge">'FAILED'</code>);
for numeric types use string representation (e.g. <code
class="highlighter-rouge">'-1'</code>); for ARRAY t [...]
+ </tr>
<tr>
<td><h5>endpoint</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -74,6 +80,12 @@
<td>Duration</td>
<td>Interval between health check requests. Shorter intervals
provide faster failure detection but increase server load. Defaults to 30
seconds. Only effective when health-check-enabled is true.</td>
</tr>
+ <tr>
+ <td><h5>max-retries</h5></td>
+ <td style="word-wrap: break-word;">0</td>
+ <td>Integer</td>
+ <td>Maximum number of retries (additional attempts beyond the
first) for failed inference requests. With max-retries=2 the request will be
attempted up to 3 times in total (1 initial attempt + 2 retries). When set to 0
(default), no retry is performed. Only transient failures are retried: network
errors and 5xx responses. Client-side 4xx errors, response parsing failures,
and circuit breaker OPEN failures are never retried because they indicate a
persistent condition. Must be [...]
+ </tr>
<tr>
<td><h5>model-name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -92,6 +104,18 @@
<td>Integer</td>
<td>Request priority level (0-255). Higher values indicate higher
priority.</td>
</tr>
+ <tr>
+ <td><h5>retry-initial-backoff</h5></td>
+ <td style="word-wrap: break-word;">100 ms</td>
+ <td>Duration</td>
+ <td>Initial backoff duration between retry attempts. Uses
exponential backoff with equal jitter: the nominal delay is initial-backoff *
2^attempt (first retry waits this duration, second retry waits 2x, third waits
4x, and so on), clamped to retry-max-backoff, then randomized in the range
[delay/2, delay] to prevent a thundering herd of concurrent retries hitting the
server at the exact same instant. Defaults to 100ms. Must be > 0.</td>
+ </tr>
+ <tr>
+ <td><h5>retry-max-backoff</h5></td>
+ <td style="word-wrap: break-word;">30 s</td>
+ <td>Duration</td>
+ <td>Upper bound on the delay between retry attempts. Exponential
backoff computed from retry-initial-backoff is clamped to this value so that a
misconfigured max-retries cannot produce hours-long sleeps or overflow the
delay computation. Defaults to 30s. Must be >= retry-initial-backoff.</td>
+ </tr>
<tr>
<td><h5>sequence-end</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/flink-models/flink-model-triton/pom.xml
b/flink-models/flink-model-triton/pom.xml
index 4af2424a9bf..c3e14fe4317 100644
--- a/flink-models/flink-model-triton/pom.xml
+++ b/flink-models/flink-model-triton/pom.xml
@@ -113,6 +113,17 @@ under the License.
<scope>test</scope>
</dependency>
+ <!-- FactoryMocks (createModelContext) is published in the
table-common test-jar; we
+ use it to construct AbstractTritonModelFunction in
unit tests without standing up a
+ full TableEnvironment. -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
diff --git
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java
index 916ffeda928..47d4ad15c41 100644
---
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java
+++
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java
@@ -36,6 +36,9 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -66,6 +69,15 @@ public abstract class AbstractTritonModelFunction extends
AsyncPredictFunction {
protected transient TritonCircuitBreaker circuitBreaker;
protected transient TritonHealthChecker healthChecker;
+ /**
+ * Scheduler used to delay retry attempts with exponential backoff. Owned
by this function so
+ * that pending retries are cancelled on {@link #close()} — relying on
{@link
+ * java.util.concurrent.CompletableFuture#delayedExecutor(long, TimeUnit)}
(which is backed by
+ * {@link java.util.concurrent.ForkJoinPool#commonPool()}) would leak
scheduled tasks past the
+ * operator lifecycle and risk firing HTTP calls against an
already-released client.
+ */
+ protected transient ScheduledExecutorService retryScheduler;
+
private final String endpoint;
/**
@@ -98,6 +110,10 @@ public abstract class AbstractTritonModelFunction extends
AsyncPredictFunction {
private final String compression;
private final String authToken;
private final Map<String, String> customHeaders;
+ private final int maxRetries;
+ private final Duration retryInitialBackoff;
+ private final Duration retryMaxBackoff;
+ private final String defaultValue;
// Health check and circuit breaker configuration
private final boolean healthCheckEnabled;
@@ -122,6 +138,42 @@ public abstract class AbstractTritonModelFunction extends
AsyncPredictFunction {
this.compression = config.get(TritonOptions.COMPRESSION);
this.authToken = config.get(TritonOptions.AUTH_TOKEN);
this.customHeaders = config.get(TritonOptions.CUSTOM_HEADERS);
+ this.maxRetries = config.get(TritonOptions.MAX_RETRIES);
+ this.retryInitialBackoff =
config.get(TritonOptions.RETRY_INITIAL_BACKOFF);
+ this.retryMaxBackoff = config.get(TritonOptions.RETRY_MAX_BACKOFF);
+ this.defaultValue = config.get(TritonOptions.DEFAULT_VALUE);
+
+ // Fail fast on invalid retry configuration. These are user-supplied
values so we surface
+ // misconfiguration as IllegalArgumentException before the function is
ever opened,
+ // instead of producing undefined behaviour (e.g. 1L << -1) at runtime.
+ Preconditions.checkArgument(
+ maxRetries >= 0,
+ "%s must be >= 0, got %s",
+ TritonOptions.MAX_RETRIES.key(),
+ maxRetries);
+ // Backoff durations only matter when retries are actually enabled.
Validating them when
+ // maxRetries == 0 would reject configurations that are otherwise
meaningless (a user who
+ // explicitly disables retries with max-retries=0 should not have to
also keep the
+ // backoff durations in a valid range, since they will never be
consulted).
+ if (maxRetries > 0) {
+ Preconditions.checkArgument(
+ retryInitialBackoff != null
+ && !retryInitialBackoff.isNegative()
+ && !retryInitialBackoff.isZero(),
+ "%s must be a positive duration, got %s",
+ TritonOptions.RETRY_INITIAL_BACKOFF.key(),
+ retryInitialBackoff);
+ Preconditions.checkArgument(
+ retryMaxBackoff != null
+ && !retryMaxBackoff.isNegative()
+ && !retryMaxBackoff.isZero()
+ && retryMaxBackoff.compareTo(retryInitialBackoff)
>= 0,
+ "%s must be a positive duration >= %s (%s), got %s",
+ TritonOptions.RETRY_MAX_BACKOFF.key(),
+ TritonOptions.RETRY_INITIAL_BACKOFF.key(),
+ retryInitialBackoff,
+ retryMaxBackoff);
+ }
// Health check and circuit breaker configuration
this.healthCheckEnabled =
config.get(TritonOptions.HEALTH_CHECK_ENABLED);
@@ -143,6 +195,28 @@ public abstract class AbstractTritonModelFunction extends
AsyncPredictFunction {
LOG.debug("Creating Triton HTTP client.");
this.httpClient = TritonUtils.createHttpClient(timeout.toMillis());
+ // Provision a private single-thread scheduler for delayed retries so
that retry tasks are
+ // bound to this operator's lifecycle. Previously
CompletableFuture.delayedExecutor was
+ // used, but it schedules on the shared ForkJoinPool.commonPool()
whose tasks are not
+ // cancellable from close() and would happily fire an HTTP call with
an already-released
+ // client. Only allocate when retries are actually enabled to avoid
idle threads.
+ if (maxRetries > 0) {
+ // Single-thread executor → the thread name does not need an index
suffix; a fixed
+ // suffix would always be "-0" and carry no diagnostic value. The
subtask index IS
+ // included so that thread dumps from a parallelism>1 deployment
can be attributed
+ // to a specific subtask rather than aliasing every parallel
instance under the same
+ // "triton-retry-scheduler-<model>" name.
+ final int subtaskIndex =
context.getTaskInfo().getIndexOfThisSubtask();
+ final String threadName = "triton-retry-scheduler-" + modelName +
"-" + subtaskIndex;
+ this.retryScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread t = new Thread(r, threadName);
+ t.setDaemon(true);
+ return t;
+ });
+ }
+
// Initialize circuit breaker if enabled
if (circuitBreakerEnabled) {
LOG.info(
@@ -232,6 +306,35 @@ public abstract class AbstractTritonModelFunction extends
AsyncPredictFunction {
// Release circuit breaker (no-op, just drop the reference).
this.circuitBreaker = null;
+ // Shut down the retry scheduler before releasing the HTTP client so
that any pending
+ // retry tasks are cancelled and cannot fire a request through a
client that is about to
+ // be released back to the shared pool. shutdownNow() is safe here: a
cancelled retry
+ // simply means the caller sees the original inference failure rather
than a spurious
+ // post-close one.
+ if (this.retryScheduler != null) {
+ LOG.debug("Shutting down Triton retry scheduler for {}",
loggedEndpoint);
+ try {
+ this.retryScheduler.shutdownNow();
+ // Best-effort await so in-flight retry tasks observe the
interrupt before the
+ // client is closed. A short timeout is fine because retries
only schedule a
+ // single short-lived Runnable.
+ if (!this.retryScheduler.awaitTermination(1,
TimeUnit.SECONDS)) {
+ LOG.warn(
+ "Triton retry scheduler did not terminate within
1s for {}",
+ loggedEndpoint);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (firstFailure == null) {
+ firstFailure = e;
+ } else {
+ firstFailure.addSuppressed(e);
+ }
+ } finally {
+ this.retryScheduler = null;
+ }
+ }
+
// Release HTTP client last so it's always attempted even if earlier
steps threw.
if (this.httpClient != null) {
LOG.debug("Releasing Triton HTTP client.");
@@ -366,12 +469,12 @@ public abstract class AbstractTritonModelFunction extends
AsyncPredictFunction {
"%s column '%s' has nested array type: %s\n"
+ "Multi-dimensional tensors (ARRAY<ARRAY<T>>) are
not supported in v1.\n"
+ "=== Supported Types ===\n"
- + " • Scalars: INT, BIGINT, FLOAT, DOUBLE,
BOOLEAN, STRING\n"
- + " • 1-D Arrays: ARRAY<INT>, ARRAY<FLOAT>,
ARRAY<DOUBLE>, etc.\n"
+ + " - Scalars: INT, BIGINT, FLOAT, DOUBLE,
BOOLEAN, STRING\n"
+ + " - 1-D Arrays: ARRAY<INT>, ARRAY<FLOAT>,
ARRAY<DOUBLE>, etc.\n"
+ "=== Workarounds ===\n"
- + " • Flatten to 1-D array: ARRAY<FLOAT> with
size = rows * cols\n"
- + " • Use JSON STRING encoding for complex
structures\n"
- + " • Wait for v2+ which will support ROW<...>
types",
+ + " - Flatten to 1-D array: ARRAY<FLOAT> with
size = rows * cols\n"
+ + " - Use JSON STRING encoding for complex
structures\n"
+ + " - Wait for v2+ which will support ROW<...>
types",
inputOrOutput,
columnName,
type);
@@ -541,4 +644,20 @@ public abstract class AbstractTritonModelFunction extends
AsyncPredictFunction {
protected boolean isCircuitBreakerEnabled() {
return circuitBreakerEnabled;
}
+
+ protected int getMaxRetries() {
+ return maxRetries;
+ }
+
+ protected Duration getRetryInitialBackoff() {
+ return retryInitialBackoff;
+ }
+
+ protected Duration getRetryMaxBackoff() {
+ return retryMaxBackoff;
+ }
+
+ protected String getDefaultValue() {
+ return defaultValue;
+ }
}
diff --git
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java
index 7bebfc942c3..46bd0e0a96c 100644
---
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java
+++
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java
@@ -18,7 +18,9 @@
package org.apache.flink.model.triton;
import org.apache.flink.configuration.ReadableConfig;
+import
org.apache.flink.model.triton.exception.TritonCircuitBreakerOpenException;
import org.apache.flink.model.triton.exception.TritonClientException;
+import org.apache.flink.model.triton.exception.TritonException;
import org.apache.flink.model.triton.exception.TritonNetworkException;
import org.apache.flink.model.triton.exception.TritonSchemaException;
import org.apache.flink.model.triton.exception.TritonServerException;
@@ -28,6 +30,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.factories.ModelProviderFactory;
import org.apache.flink.table.functions.AsyncPredictFunction;
+import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
@@ -52,8 +55,12 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
/**
@@ -108,6 +115,36 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
private final String inputName;
private final String outputName;
+ /**
+ * Pre-parsed default-value payload, computed once at {@link
#open(FunctionContext)} and reused
+ * to build per-call fallback rows. Parsing is a pure function of the
configured string plus the
+ * output {@link LogicalType}, so caching trades a one-time startup cost
for zero per-record
+ * parsing work — important because a sustained Triton outage will route
every record through
+ * the fallback path.
+ *
+ * <p>We deliberately cache only the deserialized <b>field payload</b> (a
{@code BinaryString},
+ * primitive box, array, or {@code null}) rather than a {@code
Collection<RowData>}: handing the
+ * same {@code GenericRowData}/{@code List} instance to every failing
record would create
+ * implicit aliasing across emitted rows, which breaks the "each emitted
row is independent"
+ * contract that downstream operators (aggregations keeping per-row
references, changelog
+ * collectors, serializers reusing fields) rely on. Building a fresh {@link
+ * GenericRowData#of(Object...)} plus a fresh {@link
Collections#singletonList} per fallback is
+ * effectively free and sidesteps that aliasing hazard.
+ *
+ * <p>{@link #defaultValueConfigured} is the single source of truth for
whether a fallback is
+ * enabled; {@code cachedDefaultPayload} alone is ambiguous because a
configured {@code
+ * default-value='null'} legitimately parses to {@code null}.
+ */
+ private transient Object cachedDefaultPayload;
+
+ /**
+ * Whether a {@code default-value} fallback is configured. Tracked
separately from {@link
+ * #cachedDefaultPayload} because the SQL-NULL fallback (configured as the
literal string {@code
+ * 'null'}) legitimately deserializes to a Java {@code null}, which would
otherwise be
+ * indistinguishable from "no fallback configured".
+ */
+ private transient boolean defaultValueConfigured;
+
public TritonInferenceModelFunction(
ModelProviderFactory.Context factoryContext, ReadableConfig
config) {
super(factoryContext, config);
@@ -130,9 +167,61 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
this.outputName = outputColumn.getName();
}
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+
+ // Eagerly parse the configured default-value so that (a) a malformed
default-value
+ // surfaces as a job-submission failure instead of masking itself as a
silent runtime
+ // error on the first failing record, and (b) we pay zero JSON-parse
cost on the hot
+ // fallback path during a sustained backend outage.
+ if (getDefaultValue() != null) {
+ try {
+ this.cachedDefaultPayload =
parseDefaultPayload(getDefaultValue());
+ this.defaultValueConfigured = true;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Failed to parse configured default-value '%s'
for output type %s. "
+ + "Expected formats: STRING → plain
text; numeric → string "
+ + "representation (e.g. '-1'); ARRAY →
JSON array (e.g. "
+ + "'[0.0, 0.0]'); SQL NULL → 'null'.",
+ getDefaultValue(), outputType),
+ e);
+ }
+ }
+ }
+
@Override
public CompletableFuture<Collection<RowData>> asyncPredict(RowData
rowData) {
CompletableFuture<Collection<RowData>> future = new
CompletableFuture<>();
+ asyncPredictWithRetry(rowData, future, 0);
+ return future;
+ }
+
+ /**
+ * Executes inference request with retry logic and default value fallback.
+ *
+ * @param rowData Input data for inference
+ * @param future The future to complete with result or exception
+ * @param attemptNumber Current attempt number (0-indexed)
+ */
+ private void asyncPredictWithRetry(
+ RowData rowData, CompletableFuture<Collection<RowData>> future,
int attemptNumber) {
+
+ // If the operator is tearing down, do not schedule/execute further
retry attempts.
+ // `attemptNumber > 0` means we are on a retry path (a scheduled
continuation); dropping
+ // it here lets Flink's normal cancellation path clean up the pending
future instead of
+ // us completing it on a dying thread pool.
+ if (attemptNumber > 0
+ && (Thread.currentThread().isInterrupted()
+ || retryScheduler == null
+ || retryScheduler.isShutdown())) {
+ LOG.debug(
+ "Skipping retry attempt {} because retry scheduler is
shutting down",
+ attemptNumber + 1);
+ return;
+ }
try {
// Check circuit breaker before making request
@@ -189,16 +278,23 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
@Override
public void onFailure(Call call, IOException
e) {
LOG.error(
- "Triton inference request failed
due to network error",
- e);
-
- // Record failure with circuit breaker
- recordFailure();
+ "Triton inference request failed
(attempt {}/{}) due to network error: {}",
+ attemptNumber + 1,
+ getMaxRetries() + 1,
+ e.getMessage());
// Wrap IOException in
TritonNetworkException. The sanitized
// URL is used in the user-visible message
so that basic-auth
// credentials embedded in the endpoint
cannot leak through
// exception stacks, CI logs, or error
dashboards.
+ //
+ // NOTE: We deliberately do NOT call
recordFailure() here.
+ // Retry state is "one logical request, N
physical attempts",
+ // and the circuit breaker counts logical
failures only. The
+ // failure is recorded exactly once in
handleFailureWithRetry
+ // when all retries are exhausted;
otherwise a 3-retry request
+ // that eventually succeeds would be
counted as 3 failures +
+ // 1 success, unfairly biasing the breaker
toward OPEN.
TritonNetworkException networkException =
new TritonNetworkException(
String.format(
@@ -207,7 +303,13 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
loggedUrl,
e.getMessage()),
e);
-
future.completeExceptionally(networkException);
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ networkException,
+ /* countAsBreakerFailure */ true,
+ /* retryable */ true);
}
@Override
@@ -215,23 +317,23 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
throws IOException {
try {
if (!response.isSuccessful()) {
- // Record failure for 5xx errors
(server issues) so
- // they feed into the circuit
breaker's failure-rate
- // signal. 4xx errors are
intentionally NOT recorded:
+ // Let
handleErrorResponseWithRetry classify 4xx vs 5xx
+ // and route to retry /
default-value fallback. The
+ // circuit breaker is updated
exactly once per logical
+ // request (inside
handleFailureWithRetry when retries
+ // are exhausted); see the comment
in onFailure() for
+ // why per-attempt recording would
be incorrect.
+ //
+ // Historical context: 4xx
responses are intentionally
+ // NOT fed into the breaker even
at final failure -
// they represent client-side
configuration problems
// (wrong model name, bad shape,
missing auth) that
// would persist regardless of
server health. Folding
- // them into the circuit breaker
would cause a user
- // with a persistent config error
to force-open the
- // breaker and deny traffic to an
otherwise healthy
- // server. The trade-off - 4xx
calls do not contribute
- // to the breaker's denominator -
is a deliberate
- // choice; the breaker is a
server-health signal, not
- // a generic error-rate tracker.
- if (response.code() >= 500) {
- recordFailure();
- }
- handleErrorResponse(response,
future);
+ // them into the breaker would
cause a user with a
+ // persistent config error to
force-open it and deny
+ // traffic to an otherwise healthy
server.
+ handleErrorResponseWithRetry(
+ response, rowData, future,
attemptNumber);
return;
}
@@ -243,17 +345,58 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
// user's pipeline.
okhttp3.ResponseBody body =
response.body();
if (body == null) {
- recordFailure();
- future.completeExceptionally(
+ // Treat a missing body like any
other transient
+ // response failure: route through
the retry / default-
+ // value pipeline so the operator
behaves consistently
+ // across 5xx, network errors and
malformed proxy
+ // responses. Circuit-breaker
accounting happens
+ // exactly once in
handleFailureWithRetry on final
+ // exhaustion.
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
new TritonClientException(
"Triton response
has no body for "
+ loggedUrl
+ ". This
typically indicates a misbehaving "
+ "proxy
or interceptor between the client and Triton.",
- response.code()));
+ response.code()),
+ /* countAsBreakerFailure
*/ true,
+ /* retryable */ true);
+ return;
+ }
+ String responseBody;
+ try {
+ responseBody = body.string();
+ } catch (IOException bodyReadFailure) {
+ // Reading the 2xx response body
can still fail mid-
+ // stream (connection reset, proxy
truncation). Route
+ // this through the retry pipeline
with breaker
+ // accounting: it is a
network-level failure, not a
+ // server-side or client-side
logic error.
+ LOG.error(
+ "Failed to read Triton 2xx
response body (attempt {}/{}): {}",
+ attemptNumber + 1,
+ getMaxRetries() + 1,
+
bodyReadFailure.getMessage());
+ TritonNetworkException wrapped =
+ new TritonNetworkException(
+ String.format(
+ "Failed to
read Triton response body from %s: %s. "
+ +
"The connection was lost while streaming the response.",
+ loggedUrl,
+
bodyReadFailure.getMessage()),
+ bodyReadFailure);
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ wrapped,
+ /* countAsBreakerFailure
*/ true,
+ /* retryable */ true);
return;
}
- String responseBody = body.string();
Collection<RowData> result =
parseInferenceResponse(responseBody);
@@ -264,46 +407,360 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
} catch (JsonProcessingException e) {
LOG.error("Failed to parse Triton
inference response", e);
// Don't record as circuit breaker
failure - this is a
- // client parsing issue
- future.completeExceptionally(
+ // client parsing issue (deterministic
bug in either our
+ // response handling or the server's
response schema),
+ // and retrying cannot help.
Short-circuit to fallback.
+ TritonClientException parseException =
new TritonClientException(
"Failed to parse
Triton response JSON: "
+
e.getMessage()
+ ". This may
indicate an incompatible response format.",
- 400));
+ 400);
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ parseException,
+ /* countAsBreakerFailure */
false,
+ /* retryable */ false);
} catch (Exception e) {
LOG.error("Failed to process Triton
inference response", e);
- // Don't record as circuit breaker
failure - processing
- // error
- future.completeExceptionally(e);
+ // Don't record as circuit breaker
failure - client-side
+ // processing error (post-response),
not a backend health
+ // signal. Also not retryable
(deterministic).
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ e,
+ /* countAsBreakerFailure */
false,
+ /* retryable */ false);
} finally {
response.close();
}
}
});
- } catch (Exception e) {
+ } catch (TritonCircuitBreakerOpenException e) {
+ // Circuit breaker is OPEN - fail fast. The entire point of the
breaker is to shed
+ // load when the backend is known-bad; retrying (with or without
backoff) would
+ // defeat that protection and keep hammering an unhealthy server.
We still honour the
+ // default-value fallback so callers that opted into graceful
degradation see a
+ // fallback row instead of a propagated exception.
+ //
+ // Breaker accounting rule: "one logical request counts once". The
first physical
+ // attempt (attemptNumber == 0) never touches the backend when the
breaker is
+ // already OPEN — it short-circuits here, so recording a failure
would double-count
+ // against the next real failure and also let a
healthy-but-shedding breaker
+ // accelerate its own OPEN decisions. However, if we reach here on
a retry
+ // (attemptNumber > 0), the *original* attempt did produce a
genuine backend
+ // failure that triggered the retry path; that failure was
intentionally deferred
+ // to the "retries exhausted" branch of handleFailureWithRetry so
an eventual
+ // success would not over-count transient blips. Because the
breaker has since
+ // tripped (likely via concurrent requests), there will be no
"retries exhausted"
+ // call for this logical request, and without recording here the
original backend
+ // failure would be silently dropped from breaker statistics.
+ if (attemptNumber > 0) {
+ recordFailure();
+ }
+ LOG.debug(
+ "Circuit breaker OPEN; skipping retry and routing to
default-value fallback (if configured)");
+ completeWithDefaultValueOrFail(future, e);
+ } catch (JsonProcessingException | IllegalArgumentException e) {
+ // Deterministic client-side bug (bad input shape, unsupported
compression algo, etc.).
+ // Not retryable; a standalone client bug should not count against
the breaker. On a
+ // retry path (attemptNumber > 0) we must still flush the prior
deferred 5xx/network
+ // failure — see shouldRecordBreakerFailureOnFinalCompletion.
LOG.error("Failed to build Triton inference request", e);
- future.completeExceptionally(e);
+ if (attemptNumber > 0) {
+ recordFailure();
+ }
+ completeWithDefaultValueOrFail(future, e);
+ } catch (Exception e) {
+ // Unexpected failure during request dispatch (e.g. dispatcher
shutdown race). Treat
+ // as a transient infrastructure error: retry if budget remains,
but do NOT count
+ // against the breaker (the backend never saw this request).
+ LOG.error("Unexpected error dispatching Triton request", e);
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ e,
+ /* countAsBreakerFailure */ false,
+ /* retryable */ true);
}
+ }
- return future;
+ /**
+ * Handles request failure with retry logic or default value fallback.
+ *
+ * <p>Retry is only attempted for transient failures; circuit-breaker-open
and unrecoverable
+ * errors are short-circuited by the caller. The circuit breaker is
updated at most once per
+ * logical request (here, on final exhaustion) and only when the caller
asserts that the failure
+ * is a backend-health signal — deterministic client-side bugs (JSON parse
errors, malformed
+ * input) must not poison the breaker for an otherwise-healthy server.
+ *
+ * @param rowData Input data for inference
+ * @param future The future to complete
+ * @param attemptNumber Current attempt number (0-indexed)
+ * @param error The error that caused the failure
+ * @param countAsBreakerFailure Whether this failure should be counted
against the circuit
+ * breaker when retries are exhausted. {@code true} for 5xx / network
errors; {@code false}
+ * for 4xx / parse / deterministic client bugs.
+ * @param retryable Whether retry should even be attempted. {@code false}
short-circuits to the
+ * fallback/fail path immediately (e.g. deterministic client bugs).
+ */
+ private void handleFailureWithRetry(
+ RowData rowData,
+ CompletableFuture<Collection<RowData>> future,
+ int attemptNumber,
+ Throwable error,
+ boolean countAsBreakerFailure,
+ boolean retryable) {
+
+ if (retryable && attemptNumber < getMaxRetries()) {
+ long delayMs = computeBackoffDelayMillis(attemptNumber);
+
+ // +2 = (0-indexed attemptNumber + 1 for the next attempt) + (1 to
make 1-based for
+ // human-readable logs). Total attempts is also 1-based (N retries
=> N+1 attempts).
+ int nextAttemptOneBased = attemptNumber + 2;
+ int totalAttempts = getMaxRetries() + 1;
+ LOG.info(
+ "Retrying Triton inference request (attempt {}/{}) after
{} ms",
+ nextAttemptOneBased,
+ totalAttempts,
+ delayMs);
+
+ // Schedule retry on the function-owned scheduler so that
cancellation on close()
+ // takes effect and we never fire an HTTP call against a released
client.
+ if (retryScheduler == null || retryScheduler.isShutdown()) {
+ // Should not happen while the operator is open with
maxRetries > 0, but guard
+ // defensively against race with close().
+ LOG.warn("Retry scheduler unavailable; failing request without
further retries");
+ // Same accounting rule as the "retries exhausted" branch
below: record once
+ // when the current failure is breaker-worthy OR when a prior
attempt
+ // deferred its accounting here.
+ if (shouldRecordBreakerFailureOnFinalCompletion(
+ countAsBreakerFailure, attemptNumber)) {
+ recordFailure();
+ }
+ completeWithDefaultValueOrFail(future, error);
+ return;
+ }
+ try {
+ retryScheduler.schedule(
+ () -> asyncPredictWithRetry(rowData, future,
attemptNumber + 1),
+ delayMs,
+ TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException rejected) {
+ // TOCTOU: scheduler was alive when we checked above but got
shutdown between
+ // isShutdown() and schedule() (close() racing against an
in-flight callback).
+ // Propagating would leave the AsyncIO future uncompleted and
stall the operator,
+ // so we must complete synchronously here.
+ LOG.warn(
+ "Retry scheduler rejected task (operator closing?);
failing request without further retries",
+ rejected);
+ if (shouldRecordBreakerFailureOnFinalCompletion(
+ countAsBreakerFailure, attemptNumber)) {
+ recordFailure();
+ }
+ completeWithDefaultValueOrFail(future, error);
+ }
+ } else {
+ // No more retries (exhausted or not retryable). See
+ // shouldRecordBreakerFailureOnFinalCompletion for the accounting
rule.
+ if
(shouldRecordBreakerFailureOnFinalCompletion(countAsBreakerFailure,
attemptNumber)) {
+ recordFailure();
+ }
+
+ // attemptNumber + 1 is the accurate attempt count: 1 for a
non-retryable failure
+ // at attempt 0, getMaxRetries()+1 when retries were exhausted.
+ int attemptsMade = attemptNumber + 1;
+ if (defaultValueConfigured) {
+ LOG.warn(
+ "Triton inference failed after {} attempt(s).
Returning configured default value. Original error: {}",
+ attemptsMade,
+ error.getMessage(),
+ error);
+ } else {
+ LOG.error(
+ "Triton inference failed after {} attempt(s). No
default value configured.",
+ attemptsMade);
+ }
+ completeWithDefaultValueOrFail(future, error);
+ }
}
/**
- * Handles HTTP error responses and creates appropriate typed exceptions.
+ * Whether the final-completion site should invoke {@link
#recordFailure()} for this logical
+ * request.
+ *
+ * <p>Record when either the current-attempt failure is breaker-worthy
({@code
+ * countAsBreakerFailure}) or we are on a retry path ({@code attemptNumber
> 0}), since only
+ * breaker-worthy failures enter the retry path — so {@code attemptNumber
> 0} implies a prior
+ * deferred breaker-worthy failure still needs accounting.
+ *
+ * <p>Without the {@code attemptNumber > 0} clause, a 5xx on attempt 0
followed by a
+ * non-retryable failure (4xx / JSON parse / request-build bug) on a retry
would silently drop
+ * the original 5xx from breaker statistics — the same class of bug
previously fixed for the
+ * {@link TritonCircuitBreakerOpenException} short-circuit.
*
- * @param response The HTTP response with error status
- * @param future The future to complete exceptionally
- * @throws IOException If reading response body fails
+ * <p>Package-private + static for direct unit-test coverage of the
decision rule.
*/
- private void handleErrorResponse(
- Response response, CompletableFuture<Collection<RowData>> future)
throws IOException {
+ static boolean shouldRecordBreakerFailureOnFinalCompletion(
+ boolean countAsBreakerFailure, int attemptNumber) {
+ return countAsBreakerFailure || attemptNumber > 0;
+ }
+
+ /**
+ * Computes the exponential-backoff delay for the given attempt, clamped
to {@code
+ * retry-max-backoff} to prevent overflow and unreasonably long sleeps
when {@code max-retries}
+ * is large, and randomized with equal jitter to avoid a thundering herd
of concurrent retries.
+ *
+ * <p>The nominal delay is {@code base * 2^attempt}, clamped to {@code
capMs}. With equal
+ * jitter, the returned delay is drawn uniformly from {@code [nominal/2,
nominal]}: we keep half
+ * of the backoff as a deterministic floor (so the server still gets the
intended breathing room
+ * before the next burst) and randomize the other half to spread
concurrent retries across a
+ * time window. Without jitter a cluster of N parallel AsyncIO slots
failing on the same Triton
+ * outage would all retry at the exact same instant, producing a
synchronized thundering herd
+ * that defeats the purpose of exponential backoff.
+ *
+ * <p>Package-private + static so that the deterministic portion of the
algorithm can be
+ * exercised in isolation without standing up a full {@link
TritonInferenceModelFunction}
+ * fixture. See {@link #computeBackoffWithJitter(long)} for the jitter
wrapper.
+ *
+ * <p><b>Preconditions (enforced by callers):</b> {@code attemptNumber >=
0}, {@code baseMs >
+ * 0}, {@code capMs >= baseMs}. These are validated in {@link
AbstractTritonModelFunction#open}
+ * via {@link org.apache.flink.util.Preconditions}, so direct callers of
this static helper
+ * (including tests) must uphold the same contract — passing {@code baseMs
== 0} returns {@code
+ * 0}, but that is a degenerate configuration that the option-validation
layer rejects.
+ */
+ static long computeBackoffDelayMillis(int attemptNumber, long baseMs, long
capMs) {
+ // Cap the shift amount defensively. `1L << 62` already exceeds any
realistic base*cap
+ // product; beyond 62 the Java spec masks the shift amount (mod 64),
which would silently
+ // wrap and produce bogus delays. Clamp before the shift so the math
is always monotone.
+ int shift = Math.max(0, Math.min(attemptNumber, 30));
+ long multiplier = 1L << shift;
+
+ // Guard against base * multiplier overflow: if it would overflow, use
the cap directly.
+ long delay;
+ if (baseMs > 0 && multiplier > Long.MAX_VALUE / baseMs) {
+ delay = capMs;
+ } else {
+ delay = baseMs * multiplier;
+ }
+ return Math.min(delay, capMs);
+ }
+
+ /**
+ * Applies equal jitter to an already-computed deterministic backoff:
returns a value drawn
+ * uniformly from {@code [nominalDelayMs / 2, nominalDelayMs]}. See {@link
+ * #computeBackoffDelayMillis(int, long, long)} for the rationale.
+ *
+ * <p>Kept separate (and package-private) from the deterministic helper so
that existing unit
+ * tests can continue to assert exact values for the backoff curve, while
the randomized
+ * integration still exercises jitter in production paths.
+ *
+ * <p>Uses {@link ThreadLocalRandom} to avoid contention when many AsyncIO
slots retry in
+ * parallel — a shared {@link java.util.Random} would serialize on its
internal seed field and
+ * itself become a coordination point under exactly the high-concurrency
conditions that make
+ * jitter necessary.
+ */
+ static long computeBackoffWithJitter(long nominalDelayMs) {
+ if (nominalDelayMs <= 1L) {
+ // A delay of 0 or 1 ms is already so small that jitter would be
meaningless and
+ // could introduce negative / zero delays via truncation. Pass
through.
+ return nominalDelayMs;
+ }
+ long halfDelay = nominalDelayMs / 2L;
+ // nextLong(origin, bound) requires origin < bound; halfDelay >= 1 and
nominalDelayMs + 1
+ // is strictly greater, so this is safe.
+ return ThreadLocalRandom.current().nextLong(halfDelay, nominalDelayMs
+ 1L);
+ }
+
+ private long computeBackoffDelayMillis(int attemptNumber) {
+ long nominal =
+ computeBackoffDelayMillis(
+ attemptNumber,
+ getRetryInitialBackoff().toMillis(),
+ getRetryMaxBackoff().toMillis());
+ return computeBackoffWithJitter(nominal);
+ }
+
+ /**
+ * Completes the future with a freshly-built default-value result when
{@code default-value} is
+ * configured, or with the supplied error otherwise. Centralises the
"either / or" decision so
+ * that both the retry-exhausted path and the non-retryable (4xx,
circuit-breaker-open) path
+ * share one implementation.
+ *
+ * <p>A new {@link GenericRowData} / singleton {@link Collection} is built
per invocation — see
+ * {@link #cachedDefaultPayload} for why sharing a single pre-built
collection across all
+ * fallback emissions would be unsafe.
+ */
+ private void completeWithDefaultValueOrFail(
+ CompletableFuture<Collection<RowData>> future, Throwable error) {
+ if (defaultValueConfigured) {
+ future.complete(buildDefaultResult(cachedDefaultPayload));
+ } else {
+ future.completeExceptionally(error);
+ }
+ }
+
+ /**
+ * Builds a fresh singleton {@link Collection} wrapping a fresh {@link
GenericRowData} around
+ * the given payload. Exposed as package-private + static so that the
"every fallback emits a
+ * distinct instance" invariant can be asserted in isolation — see {@link
#cachedDefaultPayload}
+ * for why sharing a single pre-built collection across all fallback
emissions would be unsafe.
+ */
+ static Collection<RowData> buildDefaultResult(Object cachedPayload) {
+ return Collections.singletonList(GenericRowData.of(cachedPayload));
+ }
+
+ /**
+ * Handles HTTP error response with retry logic.
+ *
+ * @param response The HTTP response
+ * @param rowData Input data for inference
+ * @param future The future to complete
+ * @param attemptNumber Current attempt number
+ */
+ private void handleErrorResponseWithRetry(
+ Response response,
+ RowData rowData,
+ CompletableFuture<Collection<RowData>> future,
+ int attemptNumber) {
- String errorBody =
- response.body() != null ? response.body().string() : "No error
details provided";
int statusCode = response.code();
+ // Read the error body in its own try/catch so a body-read failure
does NOT lose the
+ // original HTTP status. Without this guard, an IOException from
response.body().string()
+ // would propagate out of onResponse() into the catch (Exception e)
handler, which routes
+ // failures as non-retryable / non-breaker — causing a genuine 5xx
(which is retryable
+ // and a real backend-health signal) to be silently downgraded.
+ String errorBody;
+ try {
+ errorBody =
+ response.body() != null
+ ? response.body().string()
+ : "No error details provided";
+ } catch (IOException bodyReadFailure) {
+ LOG.warn(
+ "Failed to read Triton error response body for HTTP {}
(attempt {}/{}); "
+ + "preserving status code for routing decision",
+ statusCode,
+ attemptNumber + 1,
+ getMaxRetries() + 1,
+ bodyReadFailure);
+ errorBody = "<error body unreadable: " +
bodyReadFailure.getMessage() + ">";
+ }
+
+ LOG.error(
+ "Triton inference failed (attempt {}/{}) with HTTP {}: {}",
+ attemptNumber + 1,
+ getMaxRetries() + 1,
+ statusCode,
+ errorBody);
+
// Build detailed error message with context
StringBuilder errorMsg = new StringBuilder();
errorMsg.append(
@@ -328,72 +785,156 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
errorBody.toLowerCase().contains("shape")
|| errorBody.toLowerCase().contains("dimension");
+ TritonException exception;
+
if (statusCode >= 400 && statusCode < 500) {
// Client error - user configuration issue
errorMsg.append("\n=== Troubleshooting (Client Error) ===\n");
if (statusCode == 400) {
- errorMsg.append(" • Verify input shape matches model's
config.pbtxt\n");
- errorMsg.append(" • For scalar: use
INT/FLOAT/DOUBLE/STRING\n");
- errorMsg.append(" • For 1-D tensor: use ARRAY<type>\n");
+ errorMsg.append(" - Verify input shape matches model's
config.pbtxt\n");
+ errorMsg.append(" - For scalar: use
INT/FLOAT/DOUBLE/STRING\n");
+ errorMsg.append(" - For 1-D tensor: use ARRAY<type>\n");
errorMsg.append(
- " • Try flatten-batch-dim=true if model expects [N]
but gets [1,N]\n");
+ " - Try flatten-batch-dim=true if model expects [N]
but gets [1,N]\n");
if (isShapeMismatch) {
- // Create schema exception for shape mismatches
- future.completeExceptionally(
+ exception =
new TritonSchemaException(
errorMsg.toString(),
"See Triton model config.pbtxt",
- String.format("Flink type: %s",
inputType)));
- return;
+ String.format("Flink type: %s",
inputType));
+ } else {
+ exception = new TritonClientException(errorMsg.toString(),
statusCode);
}
} else if (statusCode == 404) {
- errorMsg.append(" • Verify model-name:
").append(getModelName()).append("\n");
- errorMsg.append(" • Verify model-version: ")
+ errorMsg.append(" - Verify model-name:
").append(getModelName()).append("\n");
+ errorMsg.append(" - Verify model-version: ")
.append(getModelVersion())
.append("\n");
- errorMsg.append(" • Check model is loaded: GET ")
+ errorMsg.append(" - Check model is loaded: GET ")
.append(sanitizedEndpoint)
.append("\n");
+ exception = new TritonClientException(errorMsg.toString(),
statusCode);
} else if (statusCode == 401 || statusCode == 403) {
- errorMsg.append(" • Check auth-token configuration\n");
- errorMsg.append(" • Verify server authentication
requirements\n");
+ errorMsg.append(" - Check auth-token configuration\n");
+ errorMsg.append(" - Verify server authentication
requirements\n");
+ exception = new TritonClientException(errorMsg.toString(),
statusCode);
+ } else {
+ exception = new TritonClientException(errorMsg.toString(),
statusCode);
}
- future.completeExceptionally(
- new TritonClientException(errorMsg.toString(),
statusCode));
+ // 4xx is a persistent client-side configuration problem: no
retry, and a
+ // standalone 4xx is not fed to the breaker (would force-open on
user misconfig
+ // against an otherwise-healthy server). On a retry path
(attemptNumber > 0) the
+ // prior 5xx/network failure was deferred; flush it once here —
same rule as
+ // shouldRecordBreakerFailureOnFinalCompletion. Bypasses
handleFailureWithRetry
+ // because there's nothing to retry.
+ if (attemptNumber > 0) {
+ recordFailure();
+ }
+ if (defaultValueConfigured) {
+ LOG.warn(
+ "Client error (HTTP {}). Returning default value.
Original error: {}",
+ statusCode,
+ exception.getMessage(),
+ exception);
+ } else {
+ // Make the "not retrying, no fallback, propagating exception"
decision
+ // explicit in the operator log so that operators chasing a
misconfiguration
+ // don't have to infer it from the absence of a retry log line.
+ LOG.error(
+ "Client error (HTTP {}) is non-retryable and no
default-value is configured; "
+ + "propagating exception to AsyncIO",
+ statusCode);
+ }
+ completeWithDefaultValueOrFail(future, exception);
} else if (statusCode >= 500 && statusCode < 600) {
- // Server error - Triton service issue
+ // Server error - Triton service issue - retryable
errorMsg.append("\n=== Troubleshooting (Server Error) ===\n");
if (statusCode == 500) {
- errorMsg.append(" • Check Triton server logs for inference
crash details\n");
- errorMsg.append(" • Model may have run out of memory\n");
- errorMsg.append(" • Input data may trigger model bug\n");
+ errorMsg.append(" - Check Triton server logs for inference
crash details\n");
+ errorMsg.append(" - Model may have run out of memory\n");
+ errorMsg.append(" - Input data may trigger model bug\n");
} else if (statusCode == 503) {
- errorMsg.append(" • Server is overloaded or unavailable\n");
- errorMsg.append(" • This error is retryable with backoff\n");
- errorMsg.append(" • Consider scaling Triton server
resources\n");
+ errorMsg.append(" - Server is overloaded or unavailable\n");
+ errorMsg.append(" - This error is retryable with backoff\n");
+ errorMsg.append(" - Consider scaling Triton server
resources\n");
} else if (statusCode == 504) {
- errorMsg.append(" • Inference exceeded gateway timeout\n");
- errorMsg.append(" • This error is retryable\n");
- errorMsg.append(" • Consider increasing timeout
configuration\n");
+ errorMsg.append(" - Inference exceeded gateway timeout\n");
+ errorMsg.append(" - This error is retryable\n");
+ errorMsg.append(" - Consider increasing timeout
configuration\n");
}
- future.completeExceptionally(
- new TritonServerException(errorMsg.toString(),
statusCode));
+ exception = new TritonServerException(errorMsg.toString(),
statusCode);
+ // 5xx is a genuine backend-health signal: count against the
breaker and retry.
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ exception,
+ /* countAsBreakerFailure */ true,
+ /* retryable */ true);
} else {
// Unexpected status code
errorMsg.append("\n=== Unexpected Status Code ===\n");
- errorMsg.append(" • This status code is not standard for
Triton\n");
- errorMsg.append(" • Check if proxy/load balancer is involved\n");
+ errorMsg.append(" - This status code is not standard for
Triton\n");
+ errorMsg.append(" - Check if proxy/load balancer is involved\n");
+
+ exception = new TritonClientException(errorMsg.toString(),
statusCode);
+ // Non-standard status - could be a misbehaving proxy. Retry
transparently but do
+ // not conflate with backend health (the real Triton server never
produced it).
+ handleFailureWithRetry(
+ rowData,
+ future,
+ attemptNumber,
+ exception,
+ /* countAsBreakerFailure */ false,
+ /* retryable */ true);
+ }
+ }
- future.completeExceptionally(
- new TritonClientException(errorMsg.toString(),
statusCode));
+ /**
+ * Parses the configured default-value string into the single field
payload that will be wrapped
+ * into a fresh {@link GenericRowData} on every fallback emission.
+ *
+ * <p>Called once in {@link #open(FunctionContext)} so that a malformed
default-value fails the
+ * job at startup rather than masking the original inference error at
runtime, and so that the
+ * hot fallback path performs no JSON parsing.
+ *
+ * <p>We return a single payload object (not a {@code
Collection<RowData>}) because sharing a
+ * pre-built {@link RowData} across every failing record would alias
mutable row state across
+ * emitted rows — see the field-level Javadoc on {@link
#cachedDefaultPayload} for details.
+ *
+ * @param defaultValueStr The default-value string to parse
+ * @return The deserialized payload for the single output column (may be
{@code null} when the
+ * user configures the literal string {@code 'null'} to request a
SQL-NULL fallback)
+ * @throws JsonProcessingException If parsing the JSON default value fails
+ */
+ private Object parseDefaultPayload(String defaultValueStr) throws
JsonProcessingException {
+ // Accept the literal string 'null' as an explicit SQL-NULL fallback
for every output
+ // type. Users who leave the option unset disable the fallback
entirely (exceptions are
+ // propagated), so we need an affirmative way to say "fall back to
NULL". Handling it
+ // uniformly here avoids the surprising difference between STRING
(explicitly special-
+ // cased) and ARRAY/numeric (where `objectMapper.readTree("null")`
returned a NullNode
+ // whose downstream mapping was implicit and undocumented).
+ //
+ // NOTE: As a consequence, a VARCHAR model cannot use the literal
string "null" (lower-
+ // case) as a non-null failure sentinel - it will always be
interpreted as SQL NULL.
+ // Users who need that exact sentinel should choose a different marker
(e.g. "NULL",
+ // "FAILED", "<null>"); this trade-off is documented in
TritonOptions.DEFAULT_VALUE.
+ if ("null".equals(defaultValueStr)) {
+ return null;
+ }
+ if (outputType instanceof VarCharType) {
+ return BinaryStringData.fromString(defaultValueStr);
}
+ // Array and other scalar types - parse JSON.
+ JsonNode jsonNode = objectMapper.readTree(defaultValueStr);
+ return TritonTypeMapper.deserializeFromJson(jsonNode, outputType);
}
private String buildInferenceRequest(RowData rowData) throws
JsonProcessingException {
@@ -511,8 +1052,21 @@ public class TritonInferenceModelFunction extends
AbstractTritonModelFunction {
LOG.warn("No outputs found in Triton response");
}
- // If no outputs found, return default value based on type
+ // If no outputs were produced, prefer the user-configured default
value when available so
+ // that downstream operators see the same fallback payload as on
retry-exhausted and
+ // breaker-open paths. Only when no default-value is configured do we
fall back to the
+ // previous "type-specific empty sentinel" behaviour — but in that
case we also warn,
+ // because an empty / null row can easily masquerade as a successful
prediction and
+ // propagate silently through aggregations.
if (results.isEmpty()) {
+ if (defaultValueConfigured) {
+ LOG.warn("Triton response contained no outputs; emitting
configured default value");
+ return buildDefaultResult(cachedDefaultPayload);
+ }
+ LOG.warn(
+ "Triton response contained no outputs and no default-value
is configured; "
+ + "emitting a type-specific empty sentinel.
Configure default-value "
+ + "to get a more explicit fallback.");
Object defaultValue;
if (outputType instanceof VarCharType) {
defaultValue = BinaryStringData.EMPTY_UTF8;
diff --git
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonModelProviderFactory.java
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonModelProviderFactory.java
index 516ec899b76..4fd6139a245 100644
---
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonModelProviderFactory.java
+++
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonModelProviderFactory.java
@@ -70,6 +70,11 @@ public class TritonModelProviderFactory implements
ModelProviderFactory {
set.add(TritonOptions.COMPRESSION);
set.add(TritonOptions.AUTH_TOKEN);
set.add(TritonOptions.CUSTOM_HEADERS);
+ // Retry and default-value fallback
+ set.add(TritonOptions.MAX_RETRIES);
+ set.add(TritonOptions.RETRY_INITIAL_BACKOFF);
+ set.add(TritonOptions.RETRY_MAX_BACKOFF);
+ set.add(TritonOptions.DEFAULT_VALUE);
return set;
}
diff --git
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
index f28a27a8b8b..d8831574c73 100644
---
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
+++
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
@@ -253,4 +253,89 @@ public class TritonOptions {
"Number of successful test requests required in
HALF_OPEN state to close the circuit. "
+ "If any request fails in HALF_OPEN
state, the circuit immediately reopens. "
+ "Defaults to 3 requests. Only effective
when circuit-breaker-enabled is true.");
+
+ // ==================== Retry and Default Value Fallback Options
====================
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<Integer> MAX_RETRIES =
+ ConfigOptions.key("max-retries")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "Maximum number of retries (additional attempts
beyond the first) "
+ + "for failed inference requests. With
max-retries=2 the "
+ + "request will be attempted up to 3 times
in total "
+ + "(1 initial attempt + 2 retries). When
set to 0 (default), "
+ + "no retry is performed. Only transient
failures are "
+ + "retried: network errors and 5xx
responses. Client-side "
+ + "4xx errors, response parsing failures,
and circuit "
+ + "breaker OPEN failures are never retried
because they "
+ + "indicate a persistent condition. Must
be >= 0.");
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<Duration> RETRY_INITIAL_BACKOFF =
+ ConfigOptions.key("retry-initial-backoff")
+ .durationType()
+ .defaultValue(Duration.ofMillis(100))
+ .withDescription(
+ "Initial backoff duration between retry attempts.
Uses exponential "
+ + "backoff with equal jitter: the nominal
delay is "
+ + "initial-backoff * 2^attempt (first
retry waits this "
+ + "duration, second retry waits 2x, third
waits 4x, and so "
+ + "on), clamped to retry-max-backoff, then
randomized in "
+ + "the range [delay/2, delay] to prevent a
thundering herd "
+ + "of concurrent retries hitting the
server at the exact "
+ + "same instant. Defaults to 100ms. Must
be > 0.");
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<Duration> RETRY_MAX_BACKOFF =
+ ConfigOptions.key("retry-max-backoff")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription(
+ "Upper bound on the delay between retry attempts.
Exponential "
+ + "backoff computed from
retry-initial-backoff is clamped "
+ + "to this value so that a misconfigured
max-retries "
+ + "cannot produce hours-long sleeps or
overflow the delay "
+ + "computation. Defaults to 30s. Must be
>= "
+ + "retry-initial-backoff.");
+
+ @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+ public static final ConfigOption<String> DEFAULT_VALUE =
+ ConfigOptions.key("default-value")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Fallback value to return when all
retry attempts "
+ + "fail (transient errors)
or when the request "
+ + "fails with a
non-retryable error (4xx). "
+ + "This allows downstream
processing to "
+ + "distinguish between
successful and failed "
+ + "predictions without
propagating exceptions. "
+ + "Format depends on
output type: for STRING "
+ + "use plain text (e.g.
%s); for numeric types "
+ + "use string
representation (e.g. %s); for "
+ + "ARRAY types use JSON
array format (e.g. %s); "
+ + "for SQL NULL use the
literal %s. "
+ + "Note: the lower-case
literal %s is ALWAYS "
+ + "interpreted as SQL NULL
and cannot be used "
+ + "as a STRING sentinel;
if you need a "
+ + "string-typed sentinel
indicating failure, "
+ + "use %s, %s or %s
instead. The value "
+ + "is parsed once at
operator initialization; "
+ + "an unparseable value
fails the job at "
+ + "startup rather than at
the first error. "
+ + "If not specified,
exceptions are thrown on "
+ + "failure.",
+ code("'FAILED'"),
+ code("'-1'"),
+ code("'[0.0, 0.0]'"),
+ code("'null'"),
+ code("'null'"),
+ code("'NULL'"),
+ code("'FAILED'"),
+ code("'<null>'"))
+ .build());
}
diff --git
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonBreakerFailureAccountingTest.java
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonBreakerFailureAccountingTest.java
new file mode 100644
index 00000000000..722644bfd66
--- /dev/null
+++
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonBreakerFailureAccountingTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link
+ *
TritonInferenceModelFunction#shouldRecordBreakerFailureOnFinalCompletion(boolean,
int)}.
+ *
+ * <p>Guards the "one logical request, one breaker entry" contract: a
breaker-worthy failure
+ * deferred on attempt 0 must be flushed even when the retry lands on a
non-breaker-worthy branch
+ * (4xx / parse / request-build bug).
+ */
+class TritonBreakerFailureAccountingTest {
+
+ @Test
+ void testStandaloneBreakerWorthyFailureIsRecorded() {
+ // 5xx / network on first attempt with no retries: record directly.
+ assertThat(
+
TritonInferenceModelFunction.shouldRecordBreakerFailureOnFinalCompletion(
+ true, 0))
+ .isTrue();
+ }
+
+ @Test
+ void testStandaloneNonBreakerFailureIsNotRecorded() {
+ // 4xx / parse error on attempt 0: not a backend-health signal.
+ assertThat(
+
TritonInferenceModelFunction.shouldRecordBreakerFailureOnFinalCompletion(
+ false, 0))
+ .isFalse();
+ }
+
+ @Test
+ void testExhaustedRetriesStillRecord() {
+ // Multiple retries all breaker-worthy: record once.
+ assertThat(
+
TritonInferenceModelFunction.shouldRecordBreakerFailureOnFinalCompletion(
+ true, 3))
+ .isTrue();
+ }
+
+ @Test
+ void testRetryLandingOnNonRetryableStillRecordsDeferred() {
+ // Regression for the bug this fix addresses: 5xx on attempt 0 → retry
→ attempt 1
+ // fails with 4xx / parse error. countAsBreakerFailure=false for the
final attempt, but
+ // the prior 5xx was deferred and must still be recorded.
+ assertThat(
+
TritonInferenceModelFunction.shouldRecordBreakerFailureOnFinalCompletion(
+ false, 1))
+ .isTrue();
+ }
+}
diff --git
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonDefaultValueFallbackTest.java
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonDefaultValueFallbackTest.java
new file mode 100644
index 00000000000..1befc12c743
--- /dev/null
+++
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonDefaultValueFallbackTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link
TritonInferenceModelFunction#buildDefaultResult(Object)}.
+ *
+ * <p>Regression guard for a subtle correctness bug: a previous implementation
cached a single
+ * {@code Collection<RowData>} (wrapping a single {@code GenericRowData}) and
handed that same
+ * instance to every failing record. Because downstream operators may retain
references to the
+ * emitted rows (keyed aggregations, changelog collectors, serializers that
reuse field storage),
+ * aliasing a single row across every fallback emission silently violates the
"each emitted row is
+ * independent" contract.
+ */
+class TritonDefaultValueFallbackTest {
+
+ @Test
+ void testEachCallReturnsFreshCollection() {
+ // Aliasing the same Collection across fallbacks would let a
downstream operator that
+ // (legitimately) mutates its input collection (e.g. addAll / clear on
the list it
+ // receives) corrupt future fallback emissions. Ensure every call
produces a distinct
+ // container.
+ Object payload = BinaryStringData.fromString("FAILED");
+ Collection<RowData> first =
TritonInferenceModelFunction.buildDefaultResult(payload);
+ Collection<RowData> second =
TritonInferenceModelFunction.buildDefaultResult(payload);
+
+ assertThat(first).isNotSameAs(second);
+ }
+
+ @Test
+ void testEachCallReturnsFreshRow() {
+ // GenericRowData carries mutable RowKind plus a mutable field array;
aliasing a single
+ // row into every emitted record would leak RowKind changes
(insert/update/delete) and
+ // any per-record field overwrites back into the cached fallback,
poisoning subsequent
+ // emissions. Assert that the row instance itself is distinct per call.
+ Object payload = BinaryStringData.fromString("FAILED");
+ RowData first =
firstRow(TritonInferenceModelFunction.buildDefaultResult(payload));
+ RowData second =
firstRow(TritonInferenceModelFunction.buildDefaultResult(payload));
+
+ assertThat(first).isNotSameAs(second);
+ }
+
+ @Test
+ void testNullPayloadProducesSqlNullField() {
+ // When the user configures `default-value='null'`,
parseDefaultPayload returns a Java
+ // null. Wrapping that in GenericRowData must produce a single-column
row whose only
+ // field reads back as SQL NULL - the "no fallback configured"
semantics must be carried
+ // by defaultValueConfigured, not by the payload value itself.
+ Collection<RowData> result =
TritonInferenceModelFunction.buildDefaultResult(null);
+ RowData row = firstRow(result);
+
+ assertThat(row.getArity()).isEqualTo(1);
+ assertThat(row.isNullAt(0)).isTrue();
+ }
+
+ @Test
+ void testDownstreamMutationOfPriorResultDoesNotAffectNextResult() {
+ // End-to-end assertion of the non-aliasing invariant: mutating the
row emitted by call
+ // N (here by overwriting its field and flipping RowKind) must leave
call N+1 pristine.
+ // Triton's fallback contract is "each record sees a clean default
row".
+ Object payload = BinaryStringData.fromString("FAILED");
+ Collection<RowData> first =
TritonInferenceModelFunction.buildDefaultResult(payload);
+ GenericRowData firstRow = (GenericRowData) firstRow(first);
+ firstRow.setField(0, BinaryStringData.fromString("HIJACKED"));
+ firstRow.setRowKind(org.apache.flink.types.RowKind.DELETE);
+
+ RowData second =
firstRow(TritonInferenceModelFunction.buildDefaultResult(payload));
+ assertThat(second.getString(0).toString()).isEqualTo("FAILED");
+
assertThat(second.getRowKind()).isEqualTo(org.apache.flink.types.RowKind.INSERT);
+ }
+
+ private static RowData firstRow(Collection<RowData> collection) {
+ assertThat(collection).hasSize(1);
+ Iterator<RowData> it = collection.iterator();
+ return it.next();
+ }
+}
diff --git
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonModelProviderFactoryTest.java
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonModelProviderFactoryTest.java
index 28f98fd0308..7fab9ce7446 100644
---
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonModelProviderFactoryTest.java
+++
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonModelProviderFactoryTest.java
@@ -42,7 +42,7 @@ class TritonModelProviderFactoryTest {
void testOptionalOptions() {
TritonModelProviderFactory factory = new TritonModelProviderFactory();
assertThat(factory.optionalOptions())
- .hasSize(10)
+ .hasSize(14)
.containsExactlyInAnyOrder(
TritonOptions.MODEL_VERSION,
TritonOptions.TIMEOUT,
@@ -53,6 +53,10 @@ class TritonModelProviderFactoryTest {
TritonOptions.SEQUENCE_END,
TritonOptions.COMPRESSION,
TritonOptions.AUTH_TOKEN,
- TritonOptions.CUSTOM_HEADERS);
+ TritonOptions.CUSTOM_HEADERS,
+ TritonOptions.MAX_RETRIES,
+ TritonOptions.RETRY_INITIAL_BACKOFF,
+ TritonOptions.RETRY_MAX_BACKOFF,
+ TritonOptions.DEFAULT_VALUE);
}
}
diff --git
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryBackoffTest.java
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryBackoffTest.java
new file mode 100644
index 00000000000..f5f5dd7339d
--- /dev/null
+++
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryBackoffTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link
TritonInferenceModelFunction#computeBackoffDelayMillis(int, long, long)}.
+ *
+ * <p>The retry backoff algorithm has several boundary conditions that are
easy to regress on (shift
+ * overflow when attempt is very large, multiplication overflow when base*2^n
wraps negative, cap
+ * monotonicity). Exercising it here without the full operator fixture keeps
the tests fast and the
+ * failure modes sharply diagnosable.
+ */
+class TritonRetryBackoffTest {
+
+ @Test
+ void testFirstAttemptReturnsBase() {
+ // attempt 0 corresponds to "first retry waits base" in the docstring.
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(0,
100L, 30_000L))
+ .isEqualTo(100L);
+ }
+
+ @Test
+ void testExponentialDoubling() {
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(1,
100L, 30_000L))
+ .isEqualTo(200L);
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(2,
100L, 30_000L))
+ .isEqualTo(400L);
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(3,
100L, 30_000L))
+ .isEqualTo(800L);
+ }
+
+ @Test
+ void testCapIsEnforced() {
+ // 100ms * 2^10 = ~102s, capped to 30s.
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(10,
100L, 30_000L))
+ .isEqualTo(30_000L);
+ }
+
+ @Test
+ void testLargeAttemptNumberDoesNotOverflow() {
+ // Java masks the shift amount mod 64, so without the clamp this would
wrap and produce
+ // an incorrect delay. The output must still equal the cap.
+ long cap = 30_000L;
+
assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(1000, 100L,
cap))
+ .isEqualTo(cap);
+ assertThat(
+ TritonInferenceModelFunction.computeBackoffDelayMillis(
+ Integer.MAX_VALUE, 100L, cap))
+ .isEqualTo(cap);
+ }
+
+ @Test
+ void testBaseTimesMultiplierOverflowFallsBackToCap() {
+ // Long.MAX_VALUE * 2 would overflow; result must be the cap, not a
negative number.
+ long base = Long.MAX_VALUE / 2;
+ long cap = 1_000L;
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(5,
base, cap))
+ .isEqualTo(cap);
+ }
+
+ @Test
+ void testNegativeAttemptNumberIsTreatedAsZero() {
+ // Defensive clamp: a negative attempt counter (should be impossible
in production but
+ // cheap to guard) must not trigger `1L << -1` (which equals `1L <<
63`).
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(-1,
100L, 30_000L))
+ .isEqualTo(100L);
+ }
+
+ @Test
+ void testDelayIsMonotonic() {
+ long base = 50L;
+ long cap = 10_000L;
+ long previous = 0L;
+ for (int attempt = 0; attempt < 20; attempt++) {
+ long delay =
TritonInferenceModelFunction.computeBackoffDelayMillis(attempt, base, cap);
+ assertThat(delay)
+ .as("delay must be non-decreasing across attempts
(attempt=%d)", attempt)
+ .isGreaterThanOrEqualTo(previous);
+ assertThat(delay)
+ .as("delay must never exceed cap (attempt=%d)", attempt)
+ .isLessThanOrEqualTo(cap);
+ previous = delay;
+ }
+ }
+
+ @Test
+ void testZeroBaseReturnsZero() {
+ // Degenerate contract: option validation normally forbids baseMs ==
0, but the static
+ // helper must still be total. Any attempt with base=0 produces 0 (0
<< n == 0), and we
+ // explicitly skip the overflow guard because `Long.MAX_VALUE / 0`
would throw.
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(0,
0L, 30_000L))
+ .isEqualTo(0L);
+ assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(5,
0L, 30_000L))
+ .isEqualTo(0L);
+ }
+
+ @Test
+ void testCapEqualsBaseAlwaysReturnsBase() {
+ // When the user configures retry-initial-backoff ==
retry-max-backoff, the cap clamps
+ // every attempt to the base delay (effectively a fixed-delay retry
schedule).
+ long fixed = 500L;
+ for (int attempt = 0; attempt < 10; attempt++) {
+ assertThat(
+
TritonInferenceModelFunction.computeBackoffDelayMillis(
+ attempt, fixed, fixed))
+ .as("cap == base collapses to fixed delay (attempt=%d)",
attempt)
+ .isEqualTo(fixed);
+ }
+ }
+
+ @Test
+ void testJitterStaysWithinEqualJitterWindow() {
+ // Equal jitter: the returned value must land in [nominal/2, nominal]
inclusive. A strict
+ // probabilistic bound check over many samples is a cheap, non-flaky
way to verify the
+ // window without asserting the exact distribution.
+ long nominal = 1000L;
+ for (int i = 0; i < 10_000; i++) {
+ long jittered =
TritonInferenceModelFunction.computeBackoffWithJitter(nominal);
+ assertThat(jittered)
+ .as("jitter must not fall below nominal/2 (iteration=%d)",
i)
+ .isGreaterThanOrEqualTo(nominal / 2L);
+ assertThat(jittered)
+ .as("jitter must not exceed nominal (iteration=%d)", i)
+ .isLessThanOrEqualTo(nominal);
+ }
+ }
+
+ @Test
+ void testJitterProducesSpreadToAvoidThunderingHerd() {
+ // A deterministic computeBackoffWithJitter would defeat the whole
point of jitter and
+ // cause a thundering herd. Sample a moderate number of draws and
require at least a few
+ // distinct values — with a uniform integer range of ~500 values the
probability of all
+ // 100 draws being identical is astronomically small (< 2^-800).
+ long nominal = 1000L;
+ java.util.Set<Long> distinct = new java.util.HashSet<>();
+ for (int i = 0; i < 100; i++) {
+
distinct.add(TritonInferenceModelFunction.computeBackoffWithJitter(nominal));
+ }
+ assertThat(distinct)
+ .as("jitter must introduce variation across calls, got only
%s", distinct)
+ .hasSizeGreaterThan(1);
+ }
+
+ @Test
+ void testJitterPassesThroughTrivialDelays() {
+ // 0 and 1 ms delays are too small for meaningful jitter and must pass
through unchanged
+ // so that a degenerate config does not accidentally return a negative
or truncated
+ // value.
+
assertThat(TritonInferenceModelFunction.computeBackoffWithJitter(0L)).isEqualTo(0L);
+
assertThat(TritonInferenceModelFunction.computeBackoffWithJitter(1L)).isEqualTo(1L);
+ }
+}
diff --git
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryConfigValidationTest.java
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryConfigValidationTest.java
new file mode 100644
index 00000000000..28b131c2bd1
--- /dev/null
+++
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryConfigValidationTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.ModelProviderFactory;
+import org.apache.flink.table.factories.utils.FactoryMocks;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Validates that retry-config validation in {@link
AbstractTritonModelFunction}'s constructor is
+ * gated on whether retries are actually enabled.
+ *
+ * <p>Background: previously the backoff-duration checks ran unconditionally,
so a user who
+ * explicitly disabled retries with {@code max-retries=0} would still be
rejected if their
+ * (otherwise unused) backoff settings were degenerate. Backoff durations only
matter when {@code
+ * maxRetries > 0}, so validating them with retries disabled is wrong.
+ */
+class TritonRetryConfigValidationTest {
+
+ private static final ResolvedSchema INPUT_SCHEMA =
+ ResolvedSchema.of(Column.physical("input", DataTypes.STRING()));
+
+ private static final ResolvedSchema OUTPUT_SCHEMA =
+ ResolvedSchema.of(Column.physical("output", DataTypes.STRING()));
+
+ @Test
+ void testZeroRetriesSkipsBackoffValidation() {
+ // With retries disabled, retry-initial-backoff=0ms is meaningless but
harmless: the
+ // backoff is never consulted. The constructor must accept it instead
of failing the job
+ // submission for an unused setting.
+ Map<String, String> options = baseOptions();
+ options.put(TritonOptions.MAX_RETRIES.key(), "0");
+ options.put(TritonOptions.RETRY_INITIAL_BACKOFF.key(), "0 ms");
+
+ ModelProviderFactory.Context context =
+ FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA,
options);
+
+ // Construction must succeed; previously this threw
IllegalArgumentException.
+ TritonInferenceModelFunction function =
+ new TritonInferenceModelFunction(context,
contextConfig(context));
+ assertThat(function).isNotNull();
+ }
+
+ @Test
+ void testZeroRetriesSkipsMaxBackoffValidation() {
+ // Same rationale as above for retry-max-backoff. Note we also set the
initial backoff
+ // to a positive value to isolate the max-backoff check.
+ Map<String, String> options = baseOptions();
+ options.put(TritonOptions.MAX_RETRIES.key(), "0");
+ options.put(TritonOptions.RETRY_INITIAL_BACKOFF.key(), "100 ms");
+ options.put(TritonOptions.RETRY_MAX_BACKOFF.key(), "0 ms");
+
+ ModelProviderFactory.Context context =
+ FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA,
options);
+
+ TritonInferenceModelFunction function =
+ new TritonInferenceModelFunction(context,
contextConfig(context));
+ assertThat(function).isNotNull();
+ }
+
+ @Test
+ void testRetriesEnabledStillRejectsZeroBackoff() {
+ // Regression guard for the gating: as soon as retries are enabled,
backoff validation
+ // must run. Otherwise a misconfigured initial-backoff would silently
produce zero-delay
+ // retry storms in production.
+ Map<String, String> options = baseOptions();
+ options.put(TritonOptions.MAX_RETRIES.key(), "3");
+ options.put(TritonOptions.RETRY_INITIAL_BACKOFF.key(), "0 ms");
+
+ ModelProviderFactory.Context context =
+ FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA,
options);
+
+ assertThatThrownBy(() -> new TritonInferenceModelFunction(context,
contextConfig(context)))
+ .isInstanceOf(IllegalArgumentException.class)
+
.hasMessageContaining(TritonOptions.RETRY_INITIAL_BACKOFF.key());
+ }
+
+ @Test
+ void testRetriesEnabledStillRejectsMaxBackoffSmallerThanInitial() {
+ Map<String, String> options = baseOptions();
+ options.put(TritonOptions.MAX_RETRIES.key(), "3");
+ options.put(TritonOptions.RETRY_INITIAL_BACKOFF.key(), "500 ms");
+ options.put(TritonOptions.RETRY_MAX_BACKOFF.key(), "100 ms");
+
+ ModelProviderFactory.Context context =
+ FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA,
options);
+
+ assertThatThrownBy(() -> new TritonInferenceModelFunction(context,
contextConfig(context)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(TritonOptions.RETRY_MAX_BACKOFF.key());
+ }
+
+ @Test
+ void testNegativeMaxRetriesAlwaysRejected() {
+ // The maxRetries >= 0 invariant guards the gate itself; without it, a
negative value
+ // would skip backoff validation AND blow up downstream (e.g. 1L << -1
in the backoff
+ // calculation). Verify it's still enforced.
+ Map<String, String> options = baseOptions();
+ options.put(TritonOptions.MAX_RETRIES.key(), "-1");
+
+ ModelProviderFactory.Context context =
+ FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA,
options);
+
+ assertThatThrownBy(() -> new TritonInferenceModelFunction(context,
contextConfig(context)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(TritonOptions.MAX_RETRIES.key());
+ }
+
+ private static Map<String, String> baseOptions() {
+ Map<String, String> options = new HashMap<>();
+ options.put("provider", TritonModelProviderFactory.IDENTIFIER);
+ options.put(TritonOptions.ENDPOINT.key(), "http://localhost:8000");
+ options.put(TritonOptions.MODEL_NAME.key(), "test-model");
+ return options;
+ }
+
+ private static Configuration contextConfig(ModelProviderFactory.Context
context) {
+ Configuration config = new Configuration();
+ context.getCatalogModel().getOptions().forEach(config::setString);
+ return config;
+ }
+}
diff --git
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryOptionsTest.java
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryOptionsTest.java
new file mode 100644
index 00000000000..e9f48616c85
--- /dev/null
+++
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryOptionsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that the retry / default-value {@link TritonOptions} expose the
contracts users rely on:
+ * stable option keys (changing them silently breaks pipelines) and the
documented default values.
+ */
+class TritonRetryOptionsTest {
+
+ @Test
+ void testMaxRetriesKeyAndDefault() {
+ assertThat(TritonOptions.MAX_RETRIES.key()).isEqualTo("max-retries");
+ assertThat(TritonOptions.MAX_RETRIES.defaultValue()).isZero();
+ }
+
+ @Test
+ void testRetryInitialBackoffKeyAndDefault() {
+
assertThat(TritonOptions.RETRY_INITIAL_BACKOFF.key()).isEqualTo("retry-initial-backoff");
+ assertThat(TritonOptions.RETRY_INITIAL_BACKOFF.defaultValue())
+ .isEqualTo(Duration.ofMillis(100));
+ }
+
+ @Test
+ void testRetryMaxBackoffKeyAndDefault() {
+
assertThat(TritonOptions.RETRY_MAX_BACKOFF.key()).isEqualTo("retry-max-backoff");
+ assertThat(TritonOptions.RETRY_MAX_BACKOFF.defaultValue())
+ .isEqualTo(Duration.ofSeconds(30));
+ }
+
+ @Test
+ void testDefaultValueKeyAndNoDefault() {
+
assertThat(TritonOptions.DEFAULT_VALUE.key()).isEqualTo("default-value");
+ // When not configured, default-value must be null so the operator can
distinguish
+ // "fallback enabled" from "no fallback configured".
+ assertThat(TritonOptions.DEFAULT_VALUE.defaultValue()).isNull();
+ }
+
+ @Test
+ void testRetryInitialBackoffDefaultIsLessThanOrEqualToMax() {
+ // Regression guard for a subtle misconfiguration: if the default of
RETRY_INITIAL_BACKOFF
+ // ever exceeded RETRY_MAX_BACKOFF's default, every out-of-the-box job
would fail
+ // validation in AbstractTritonModelFunction's constructor.
+ assertThat(TritonOptions.RETRY_INITIAL_BACKOFF.defaultValue())
+
.isLessThanOrEqualTo(TritonOptions.RETRY_MAX_BACKOFF.defaultValue());
+ }
+}