This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e329586e365 [FLINK-39225][model] Add retry support for triton 
inference (#27561)
e329586e365 is described below

commit e329586e365f9da34601832654c5352211d401cf
Author: Feat Zhang <[email protected]>
AuthorDate: Thu May 7 19:51:32 2026 +0800

    [FLINK-39225][model] Add retry support for triton inference (#27561)
---
 .../generated/model_triton_advanced_section.html   |  24 +
 .../shortcodes/generated/triton_configuration.html |  24 +
 flink-models/flink-model-triton/pom.xml            |  11 +
 .../model/triton/AbstractTritonModelFunction.java  | 129 +++-
 .../model/triton/TritonInferenceModelFunction.java | 700 ++++++++++++++++++---
 .../model/triton/TritonModelProviderFactory.java   |   5 +
 .../apache/flink/model/triton/TritonOptions.java   |  85 +++
 .../triton/TritonBreakerFailureAccountingTest.java |  71 +++
 .../triton/TritonDefaultValueFallbackTest.java     | 103 +++
 .../triton/TritonModelProviderFactoryTest.java     |   8 +-
 .../flink/model/triton/TritonRetryBackoffTest.java | 171 +++++
 .../triton/TritonRetryConfigValidationTest.java    | 148 +++++
 .../flink/model/triton/TritonRetryOptionsTest.java |  68 ++
 13 files changed, 1467 insertions(+), 80 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/model_triton_advanced_section.html 
b/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
index 794b20941ff..0d14b114b62 100644
--- a/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
+++ b/docs/layouts/shortcodes/generated/model_triton_advanced_section.html
@@ -50,6 +50,12 @@
             <td>Map</td>
             <td>Custom HTTP headers as key-value pairs. Example: <code 
class="highlighter-rouge">'X-Custom-Header:value,X-Another:value2'</code></td>
         </tr>
+        <tr>
+            <td><h5>default-value</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Fallback value to return when all retry attempts fail 
(transient errors) or when the request fails with a non-retryable error (4xx). 
This allows downstream processing to distinguish between successful and failed 
predictions without propagating exceptions. Format depends on output type: for 
STRING use plain text (e.g. <code class="highlighter-rouge">'FAILED'</code>); 
for numeric types use string representation (e.g. <code 
class="highlighter-rouge">'-1'</code>); for ARRAY t [...]
+        </tr>
         <tr>
             <td><h5>flatten-batch-dim</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -68,12 +74,30 @@
             <td>Duration</td>
             <td>Interval between health check requests. Shorter intervals 
provide faster failure detection but increase server load. Defaults to 30 
seconds. Only effective when health-check-enabled is true.</td>
         </tr>
+        <tr>
+            <td><h5>max-retries</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Integer</td>
+            <td>Maximum number of retries (additional attempts beyond the 
first) for failed inference requests. With max-retries=2 the request will be 
attempted up to 3 times in total (1 initial attempt + 2 retries). When set to 0 
(default), no retry is performed. Only transient failures are retried: network 
errors and 5xx responses. Client-side 4xx errors, response parsing failures, 
and circuit breaker OPEN failures are never retried because they indicate a 
persistent condition. Must be [...]
+        </tr>
         <tr>
             <td><h5>priority</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
             <td>Request priority level (0-255). Higher values indicate higher 
priority.</td>
         </tr>
+        <tr>
+            <td><h5>retry-initial-backoff</h5></td>
+            <td style="word-wrap: break-word;">100 ms</td>
+            <td>Duration</td>
+            <td>Initial backoff duration between retry attempts. Uses 
exponential backoff with equal jitter: the nominal delay is initial-backoff * 
2^attempt (first retry waits this duration, second retry waits 2x, third waits 
4x, and so on), clamped to retry-max-backoff, then randomized in the range 
[delay/2, delay] to prevent a thundering herd of concurrent retries hitting the 
server at the exact same instant. Defaults to 100ms. Must be &gt; 0.</td>
+        </tr>
+        <tr>
+            <td><h5>retry-max-backoff</h5></td>
+            <td style="word-wrap: break-word;">30 s</td>
+            <td>Duration</td>
+            <td>Upper bound on the delay between retry attempts. Exponential 
backoff computed from retry-initial-backoff is clamped to this value so that a 
misconfigured max-retries cannot produce hours-long sleeps or overflow the 
delay computation. Defaults to 30s. Must be &gt;= retry-initial-backoff.</td>
+        </tr>
         <tr>
             <td><h5>sequence-end</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/docs/layouts/shortcodes/generated/triton_configuration.html 
b/docs/layouts/shortcodes/generated/triton_configuration.html
index 2ebfac39733..882d9d3f548 100644
--- a/docs/layouts/shortcodes/generated/triton_configuration.html
+++ b/docs/layouts/shortcodes/generated/triton_configuration.html
@@ -50,6 +50,12 @@
             <td>Map</td>
             <td>Custom HTTP headers as key-value pairs. Example: <code 
class="highlighter-rouge">'X-Custom-Header:value,X-Another:value2'</code></td>
         </tr>
+        <tr>
+            <td><h5>default-value</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Fallback value to return when all retry attempts fail 
(transient errors) or when the request fails with a non-retryable error (4xx). 
This allows downstream processing to distinguish between successful and failed 
predictions without propagating exceptions. Format depends on output type: for 
STRING use plain text (e.g. <code class="highlighter-rouge">'FAILED'</code>); 
for numeric types use string representation (e.g. <code 
class="highlighter-rouge">'-1'</code>); for ARRAY t [...]
+        </tr>
         <tr>
             <td><h5>endpoint</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -74,6 +80,12 @@
             <td>Duration</td>
             <td>Interval between health check requests. Shorter intervals 
provide faster failure detection but increase server load. Defaults to 30 
seconds. Only effective when health-check-enabled is true.</td>
         </tr>
+        <tr>
+            <td><h5>max-retries</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Integer</td>
+            <td>Maximum number of retries (additional attempts beyond the 
first) for failed inference requests. With max-retries=2 the request will be 
attempted up to 3 times in total (1 initial attempt + 2 retries). When set to 0 
(default), no retry is performed. Only transient failures are retried: network 
errors and 5xx responses. Client-side 4xx errors, response parsing failures, 
and circuit breaker OPEN failures are never retried because they indicate a 
persistent condition. Must be [...]
+        </tr>
         <tr>
             <td><h5>model-name</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -92,6 +104,18 @@
             <td>Integer</td>
             <td>Request priority level (0-255). Higher values indicate higher 
priority.</td>
         </tr>
+        <tr>
+            <td><h5>retry-initial-backoff</h5></td>
+            <td style="word-wrap: break-word;">100 ms</td>
+            <td>Duration</td>
+            <td>Initial backoff duration between retry attempts. Uses 
exponential backoff with equal jitter: the nominal delay is initial-backoff * 
2^attempt (first retry waits this duration, second retry waits 2x, third waits 
4x, and so on), clamped to retry-max-backoff, then randomized in the range 
[delay/2, delay] to prevent a thundering herd of concurrent retries hitting the 
server at the exact same instant. Defaults to 100ms. Must be &gt; 0.</td>
+        </tr>
+        <tr>
+            <td><h5>retry-max-backoff</h5></td>
+            <td style="word-wrap: break-word;">30 s</td>
+            <td>Duration</td>
+            <td>Upper bound on the delay between retry attempts. Exponential 
backoff computed from retry-initial-backoff is clamped to this value so that a 
misconfigured max-retries cannot produce hours-long sleeps or overflow the 
delay computation. Defaults to 30s. Must be &gt;= retry-initial-backoff.</td>
+        </tr>
         <tr>
             <td><h5>sequence-end</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/flink-models/flink-model-triton/pom.xml 
b/flink-models/flink-model-triton/pom.xml
index 4af2424a9bf..c3e14fe4317 100644
--- a/flink-models/flink-model-triton/pom.xml
+++ b/flink-models/flink-model-triton/pom.xml
@@ -113,6 +113,17 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- FactoryMocks (createModelContext) is published in the 
table-common test-jar; we
+                        use it to construct AbstractTritonModelFunction in 
unit tests without standing up a
+                        full TableEnvironment. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients</artifactId>
diff --git 
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java
 
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java
index 916ffeda928..47d4ad15c41 100644
--- 
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java
+++ 
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/AbstractTritonModelFunction.java
@@ -36,6 +36,9 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -66,6 +69,15 @@ public abstract class AbstractTritonModelFunction extends 
AsyncPredictFunction {
     protected transient TritonCircuitBreaker circuitBreaker;
     protected transient TritonHealthChecker healthChecker;
 
+    /**
+     * Scheduler used to delay retry attempts with exponential backoff. Owned 
by this function so
+     * that pending retries are cancelled on {@link #close()} — relying on 
{@link
+     * java.util.concurrent.CompletableFuture#delayedExecutor(long, TimeUnit)} 
(which is backed by
+     * {@link java.util.concurrent.ForkJoinPool#commonPool()}) would leak 
scheduled tasks past the
+     * operator lifecycle and risk firing HTTP calls against an 
already-released client.
+     */
+    protected transient ScheduledExecutorService retryScheduler;
+
     private final String endpoint;
 
     /**
@@ -98,6 +110,10 @@ public abstract class AbstractTritonModelFunction extends 
AsyncPredictFunction {
     private final String compression;
     private final String authToken;
     private final Map<String, String> customHeaders;
+    private final int maxRetries;
+    private final Duration retryInitialBackoff;
+    private final Duration retryMaxBackoff;
+    private final String defaultValue;
 
     // Health check and circuit breaker configuration
     private final boolean healthCheckEnabled;
@@ -122,6 +138,42 @@ public abstract class AbstractTritonModelFunction extends 
AsyncPredictFunction {
         this.compression = config.get(TritonOptions.COMPRESSION);
         this.authToken = config.get(TritonOptions.AUTH_TOKEN);
         this.customHeaders = config.get(TritonOptions.CUSTOM_HEADERS);
+        this.maxRetries = config.get(TritonOptions.MAX_RETRIES);
+        this.retryInitialBackoff = 
config.get(TritonOptions.RETRY_INITIAL_BACKOFF);
+        this.retryMaxBackoff = config.get(TritonOptions.RETRY_MAX_BACKOFF);
+        this.defaultValue = config.get(TritonOptions.DEFAULT_VALUE);
+
+        // Fail fast on invalid retry configuration. These are user-supplied 
values so we surface
+        // misconfiguration as IllegalArgumentException before the function is 
ever opened,
+        // instead of producing undefined behaviour (e.g. 1L << -1) at runtime.
+        Preconditions.checkArgument(
+                maxRetries >= 0,
+                "%s must be >= 0, got %s",
+                TritonOptions.MAX_RETRIES.key(),
+                maxRetries);
+        // Backoff durations only matter when retries are actually enabled. 
Validating them when
+        // maxRetries == 0 would reject configurations that are otherwise 
meaningless (a user who
+        // explicitly disables retries with max-retries=0 should not have to 
also keep the
+        // backoff durations in a valid range, since they will never be 
consulted).
+        if (maxRetries > 0) {
+            Preconditions.checkArgument(
+                    retryInitialBackoff != null
+                            && !retryInitialBackoff.isNegative()
+                            && !retryInitialBackoff.isZero(),
+                    "%s must be a positive duration, got %s",
+                    TritonOptions.RETRY_INITIAL_BACKOFF.key(),
+                    retryInitialBackoff);
+            Preconditions.checkArgument(
+                    retryMaxBackoff != null
+                            && !retryMaxBackoff.isNegative()
+                            && !retryMaxBackoff.isZero()
+                            && retryMaxBackoff.compareTo(retryInitialBackoff) 
>= 0,
+                    "%s must be a positive duration >= %s (%s), got %s",
+                    TritonOptions.RETRY_MAX_BACKOFF.key(),
+                    TritonOptions.RETRY_INITIAL_BACKOFF.key(),
+                    retryInitialBackoff,
+                    retryMaxBackoff);
+        }
 
         // Health check and circuit breaker configuration
         this.healthCheckEnabled = 
config.get(TritonOptions.HEALTH_CHECK_ENABLED);
@@ -143,6 +195,28 @@ public abstract class AbstractTritonModelFunction extends 
AsyncPredictFunction {
         LOG.debug("Creating Triton HTTP client.");
         this.httpClient = TritonUtils.createHttpClient(timeout.toMillis());
 
+        // Provision a private single-thread scheduler for delayed retries so 
that retry tasks are
+        // bound to this operator's lifecycle. Previously 
CompletableFuture.delayedExecutor was
+        // used, but it schedules on the shared ForkJoinPool.commonPool() 
whose tasks are not
+        // cancellable from close() and would happily fire an HTTP call with 
an already-released
+        // client. Only allocate when retries are actually enabled to avoid 
idle threads.
+        if (maxRetries > 0) {
+            // Single-thread executor → the thread name does not need an index 
suffix; a fixed
+            // suffix would always be "-0" and carry no diagnostic value. The 
subtask index IS
+            // included so that thread dumps from a parallelism>1 deployment 
can be attributed
+            // to a specific subtask rather than aliasing every parallel 
instance under the same
+            // "triton-retry-scheduler-<model>" name.
+            final int subtaskIndex = 
context.getTaskInfo().getIndexOfThisSubtask();
+            final String threadName = "triton-retry-scheduler-" + modelName + 
"-" + subtaskIndex;
+            this.retryScheduler =
+                    Executors.newSingleThreadScheduledExecutor(
+                            r -> {
+                                Thread t = new Thread(r, threadName);
+                                t.setDaemon(true);
+                                return t;
+                            });
+        }
+
         // Initialize circuit breaker if enabled
         if (circuitBreakerEnabled) {
             LOG.info(
@@ -232,6 +306,35 @@ public abstract class AbstractTritonModelFunction extends 
AsyncPredictFunction {
         // Release circuit breaker (no-op, just drop the reference).
         this.circuitBreaker = null;
 
+        // Shut down the retry scheduler before releasing the HTTP client so 
that any pending
+        // retry tasks are cancelled and cannot fire a request through a 
client that is about to
+        // be released back to the shared pool. shutdownNow() is safe here: a 
cancelled retry
+        // simply means the caller sees the original inference failure rather 
than a spurious
+        // post-close one.
+        if (this.retryScheduler != null) {
+            LOG.debug("Shutting down Triton retry scheduler for {}", 
loggedEndpoint);
+            try {
+                this.retryScheduler.shutdownNow();
+                // Best-effort await so in-flight retry tasks observe the 
interrupt before the
+                // client is closed. A short timeout is fine because retries 
only schedule a
+                // single short-lived Runnable.
+                if (!this.retryScheduler.awaitTermination(1, 
TimeUnit.SECONDS)) {
+                    LOG.warn(
+                            "Triton retry scheduler did not terminate within 
1s for {}",
+                            loggedEndpoint);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                if (firstFailure == null) {
+                    firstFailure = e;
+                } else {
+                    firstFailure.addSuppressed(e);
+                }
+            } finally {
+                this.retryScheduler = null;
+            }
+        }
+
         // Release HTTP client last so it's always attempted even if earlier 
steps threw.
         if (this.httpClient != null) {
             LOG.debug("Releasing Triton HTTP client.");
@@ -366,12 +469,12 @@ public abstract class AbstractTritonModelFunction extends 
AsyncPredictFunction {
                     "%s column '%s' has nested array type: %s\n"
                             + "Multi-dimensional tensors (ARRAY<ARRAY<T>>) are 
not supported in v1.\n"
                             + "=== Supported Types ===\n"
-                            + "  • Scalars: INT, BIGINT, FLOAT, DOUBLE, 
BOOLEAN, STRING\n"
-                            + "  • 1-D Arrays: ARRAY<INT>, ARRAY<FLOAT>, 
ARRAY<DOUBLE>, etc.\n"
+                            + "  - Scalars: INT, BIGINT, FLOAT, DOUBLE, 
BOOLEAN, STRING\n"
+                            + "  - 1-D Arrays: ARRAY<INT>, ARRAY<FLOAT>, 
ARRAY<DOUBLE>, etc.\n"
                             + "=== Workarounds ===\n"
-                            + "  • Flatten to 1-D array: ARRAY<FLOAT> with 
size = rows * cols\n"
-                            + "  • Use JSON STRING encoding for complex 
structures\n"
-                            + "  • Wait for v2+ which will support ROW<...> 
types",
+                            + "  - Flatten to 1-D array: ARRAY<FLOAT> with 
size = rows * cols\n"
+                            + "  - Use JSON STRING encoding for complex 
structures\n"
+                            + "  - Wait for v2+ which will support ROW<...> 
types",
                     inputOrOutput,
                     columnName,
                     type);
@@ -541,4 +644,20 @@ public abstract class AbstractTritonModelFunction extends 
AsyncPredictFunction {
     protected boolean isCircuitBreakerEnabled() {
         return circuitBreakerEnabled;
     }
+
+    protected int getMaxRetries() {
+        return maxRetries;
+    }
+
+    protected Duration getRetryInitialBackoff() {
+        return retryInitialBackoff;
+    }
+
+    protected Duration getRetryMaxBackoff() {
+        return retryMaxBackoff;
+    }
+
+    protected String getDefaultValue() {
+        return defaultValue;
+    }
 }
diff --git 
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java
 
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java
index 7bebfc942c3..46bd0e0a96c 100644
--- 
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java
+++ 
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonInferenceModelFunction.java
@@ -18,7 +18,9 @@
 package org.apache.flink.model.triton;
 
 import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.model.triton.exception.TritonCircuitBreakerOpenException;
 import org.apache.flink.model.triton.exception.TritonClientException;
+import org.apache.flink.model.triton.exception.TritonException;
 import org.apache.flink.model.triton.exception.TritonNetworkException;
 import org.apache.flink.model.triton.exception.TritonSchemaException;
 import org.apache.flink.model.triton.exception.TritonServerException;
@@ -28,6 +30,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.factories.ModelProviderFactory;
 import org.apache.flink.table.functions.AsyncPredictFunction;
+import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -52,8 +55,12 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPOutputStream;
 
 /**
@@ -108,6 +115,36 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
     private final String inputName;
     private final String outputName;
 
+    /**
+     * Pre-parsed default-value payload, computed once at {@link 
#open(FunctionContext)} and reused
+     * to build per-call fallback rows. Parsing is a pure function of the 
configured string plus the
+     * output {@link LogicalType}, so caching trades a one-time startup cost 
for zero per-record
+     * parsing work — important because a sustained Triton outage will route 
every record through
+     * the fallback path.
+     *
+     * <p>We deliberately cache only the deserialized <b>field payload</b> (a 
{@code BinaryString},
+     * primitive box, array, or {@code null}) rather than a {@code 
Collection<RowData>}: handing the
+     * same {@code GenericRowData}/{@code List} instance to every failing 
record would create
+     * implicit aliasing across emitted rows, which breaks the "each emitted 
row is independent"
+     * contract that downstream operators (aggregations keeping per-row 
references, changelog
+     * collectors, serializers reusing fields) rely on. Building a fresh {@link
+     * GenericRowData#of(Object...)} plus a fresh {@link 
Collections#singletonList} per fallback is
+     * effectively free and sidesteps that aliasing hazard.
+     *
+     * <p>{@link #defaultValueConfigured} is the single source of truth for 
whether a fallback is
+     * enabled; {@code cachedDefaultPayload} alone is ambiguous because a 
configured {@code
+     * default-value='null'} legitimately parses to {@code null}.
+     */
+    private transient Object cachedDefaultPayload;
+
+    /**
+     * Whether a {@code default-value} fallback is configured. Tracked 
separately from {@link
+     * #cachedDefaultPayload} because the SQL-NULL fallback (configured as the 
literal string {@code
+     * 'null'}) legitimately deserializes to a Java {@code null}, which would 
otherwise be
+     * indistinguishable from "no fallback configured".
+     */
+    private transient boolean defaultValueConfigured;
+
     public TritonInferenceModelFunction(
             ModelProviderFactory.Context factoryContext, ReadableConfig 
config) {
         super(factoryContext, config);
@@ -130,9 +167,61 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
         this.outputName = outputColumn.getName();
     }
 
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+
+        // Eagerly parse the configured default-value so that (a) a malformed 
default-value
+        // surfaces as a job-submission failure instead of masking itself as a 
silent runtime
+        // error on the first failing record, and (b) we pay zero JSON-parse 
cost on the hot
+        // fallback path during a sustained backend outage.
+        if (getDefaultValue() != null) {
+            try {
+                this.cachedDefaultPayload = 
parseDefaultPayload(getDefaultValue());
+                this.defaultValueConfigured = true;
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Failed to parse configured default-value '%s' 
for output type %s. "
+                                        + "Expected formats: STRING → plain 
text; numeric → string "
+                                        + "representation (e.g. '-1'); ARRAY → 
JSON array (e.g. "
+                                        + "'[0.0, 0.0]'); SQL NULL → 'null'.",
+                                getDefaultValue(), outputType),
+                        e);
+            }
+        }
+    }
+
     @Override
     public CompletableFuture<Collection<RowData>> asyncPredict(RowData 
rowData) {
         CompletableFuture<Collection<RowData>> future = new 
CompletableFuture<>();
+        asyncPredictWithRetry(rowData, future, 0);
+        return future;
+    }
+
+    /**
+     * Executes inference request with retry logic and default value fallback.
+     *
+     * @param rowData Input data for inference
+     * @param future The future to complete with result or exception
+     * @param attemptNumber Current attempt number (0-indexed)
+     */
+    private void asyncPredictWithRetry(
+            RowData rowData, CompletableFuture<Collection<RowData>> future, 
int attemptNumber) {
+
+        // If the operator is tearing down, do not schedule/execute further 
retry attempts.
+        // `attemptNumber > 0` means we are on a retry path (a scheduled 
continuation); dropping
+        // it here lets Flink's normal cancellation path clean up the pending 
future instead of
+        // us completing it on a dying thread pool.
+        if (attemptNumber > 0
+                && (Thread.currentThread().isInterrupted()
+                        || retryScheduler == null
+                        || retryScheduler.isShutdown())) {
+            LOG.debug(
+                    "Skipping retry attempt {} because retry scheduler is 
shutting down",
+                    attemptNumber + 1);
+            return;
+        }
 
         try {
             // Check circuit breaker before making request
@@ -189,16 +278,23 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
                                 @Override
                                 public void onFailure(Call call, IOException 
e) {
                                     LOG.error(
-                                            "Triton inference request failed 
due to network error",
-                                            e);
-
-                                    // Record failure with circuit breaker
-                                    recordFailure();
+                                            "Triton inference request failed 
(attempt {}/{}) due to network error: {}",
+                                            attemptNumber + 1,
+                                            getMaxRetries() + 1,
+                                            e.getMessage());
 
                                     // Wrap IOException in 
TritonNetworkException. The sanitized
                                     // URL is used in the user-visible message 
so that basic-auth
                                     // credentials embedded in the endpoint 
cannot leak through
                                     // exception stacks, CI logs, or error 
dashboards.
+                                    //
+                                    // NOTE: We deliberately do NOT call 
recordFailure() here.
+                                    // Retry state is "one logical request, N 
physical attempts",
+                                    // and the circuit breaker counts logical 
failures only. The
+                                    // failure is recorded exactly once in 
handleFailureWithRetry
+                                    // when all retries are exhausted; 
otherwise a 3-retry request
+                                    // that eventually succeeds would be 
counted as 3 failures +
+                                    // 1 success, unfairly biasing the breaker 
toward OPEN.
                                     TritonNetworkException networkException =
                                             new TritonNetworkException(
                                                     String.format(
@@ -207,7 +303,13 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
                                                             loggedUrl, 
e.getMessage()),
                                                     e);
 
-                                    
future.completeExceptionally(networkException);
+                                    handleFailureWithRetry(
+                                            rowData,
+                                            future,
+                                            attemptNumber,
+                                            networkException,
+                                            /* countAsBreakerFailure */ true,
+                                            /* retryable */ true);
                                 }
 
                                 @Override
@@ -215,23 +317,23 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
                                         throws IOException {
                                     try {
                                         if (!response.isSuccessful()) {
-                                            // Record failure for 5xx errors 
(server issues) so
-                                            // they feed into the circuit 
breaker's failure-rate
-                                            // signal. 4xx errors are 
intentionally NOT recorded:
+                                            // Let 
handleErrorResponseWithRetry classify 4xx vs 5xx
+                                            // and route to retry / 
default-value fallback. The
+                                            // circuit breaker is updated 
exactly once per logical
+                                            // request (inside 
handleFailureWithRetry when retries
+                                            // are exhausted); see the comment 
in onFailure() for
+                                            // why per-attempt recording would 
be incorrect.
+                                            //
+                                            // Historical context: 4xx 
responses are intentionally
+                                            // NOT fed into the breaker even 
at final failure -
                                             // they represent client-side 
configuration problems
                                             // (wrong model name, bad shape, 
missing auth) that
                                             // would persist regardless of 
server health. Folding
-                                            // them into the circuit breaker 
would cause a user
-                                            // with a persistent config error 
to force-open the
-                                            // breaker and deny traffic to an 
otherwise healthy
-                                            // server. The trade-off - 4xx 
calls do not contribute
-                                            // to the breaker's denominator - 
is a deliberate
-                                            // choice; the breaker is a 
server-health signal, not
-                                            // a generic error-rate tracker.
-                                            if (response.code() >= 500) {
-                                                recordFailure();
-                                            }
-                                            handleErrorResponse(response, 
future);
+                                            // them into the breaker would 
cause a user with a
+                                            // persistent config error to 
force-open it and deny
+                                            // traffic to an otherwise healthy 
server.
+                                            handleErrorResponseWithRetry(
+                                                    response, rowData, future, 
attemptNumber);
                                             return;
                                         }
 
@@ -243,17 +345,58 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
                                         // user's pipeline.
                                         okhttp3.ResponseBody body = 
response.body();
                                         if (body == null) {
-                                            recordFailure();
-                                            future.completeExceptionally(
+                                            // Treat a missing body like any 
other transient
+                                            // response failure: route through 
the retry / default-
+                                            // value pipeline so the operator 
behaves consistently
+                                            // across 5xx, network errors and 
malformed proxy
+                                            // responses. Circuit-breaker 
accounting happens
+                                            // exactly once in 
handleFailureWithRetry on final
+                                            // exhaustion.
+                                            handleFailureWithRetry(
+                                                    rowData,
+                                                    future,
+                                                    attemptNumber,
                                                     new TritonClientException(
                                                             "Triton response 
has no body for "
                                                                     + loggedUrl
                                                                     + ". This 
typically indicates a misbehaving "
                                                                     + "proxy 
or interceptor between the client and Triton.",
-                                                            response.code()));
+                                                            response.code()),
+                                                    /* countAsBreakerFailure 
*/ true,
+                                                    /* retryable */ true);
+                                            return;
+                                        }
+                                        String responseBody;
+                                        try {
+                                            responseBody = body.string();
+                                        } catch (IOException bodyReadFailure) {
+                                            // Reading the 2xx response body 
can still fail mid-
+                                            // stream (connection reset, proxy 
truncation). Route
+                                            // this through the retry pipeline 
with breaker
+                                            // accounting: it is a 
network-level failure, not a
+                                            // server-side or client-side 
logic error.
+                                            LOG.error(
+                                                    "Failed to read Triton 2xx 
response body (attempt {}/{}): {}",
+                                                    attemptNumber + 1,
+                                                    getMaxRetries() + 1,
+                                                    
bodyReadFailure.getMessage());
+                                            TritonNetworkException wrapped =
+                                                    new TritonNetworkException(
+                                                            String.format(
+                                                                    "Failed to 
read Triton response body from %s: %s. "
+                                                                            + 
"The connection was lost while streaming the response.",
+                                                                    loggedUrl,
+                                                                    
bodyReadFailure.getMessage()),
+                                                            bodyReadFailure);
+                                            handleFailureWithRetry(
+                                                    rowData,
+                                                    future,
+                                                    attemptNumber,
+                                                    wrapped,
+                                                    /* countAsBreakerFailure 
*/ true,
+                                                    /* retryable */ true);
                                             return;
                                         }
-                                        String responseBody = body.string();
                                         Collection<RowData> result =
                                                 
parseInferenceResponse(responseBody);
 
@@ -264,46 +407,360 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
                                     } catch (JsonProcessingException e) {
                                         LOG.error("Failed to parse Triton 
inference response", e);
                                         // Don't record as circuit breaker 
failure - this is a
-                                        // client parsing issue
-                                        future.completeExceptionally(
+                                        // client parsing issue (deterministic 
bug in either our
+                                        // response handling or the server's 
response schema),
+                                        // and retrying cannot help. 
Short-circuit to fallback.
+                                        TritonClientException parseException =
                                                 new TritonClientException(
                                                         "Failed to parse 
Triton response JSON: "
                                                                 + 
e.getMessage()
                                                                 + ". This may 
indicate an incompatible response format.",
-                                                        400));
+                                                        400);
+                                        handleFailureWithRetry(
+                                                rowData,
+                                                future,
+                                                attemptNumber,
+                                                parseException,
+                                                /* countAsBreakerFailure */ 
false,
+                                                /* retryable */ false);
                                     } catch (Exception e) {
                                         LOG.error("Failed to process Triton 
inference response", e);
-                                        // Don't record as circuit breaker 
failure - processing
-                                        // error
-                                        future.completeExceptionally(e);
+                                        // Don't record as circuit breaker 
failure - client-side
+                                        // processing error (post-response), 
not a backend health
+                                        // signal. Also not retryable 
(deterministic).
+                                        handleFailureWithRetry(
+                                                rowData,
+                                                future,
+                                                attemptNumber,
+                                                e,
+                                                /* countAsBreakerFailure */ 
false,
+                                                /* retryable */ false);
                                     } finally {
                                         response.close();
                                     }
                                 }
                             });
 
-        } catch (Exception e) {
+        } catch (TritonCircuitBreakerOpenException e) {
+            // Circuit breaker is OPEN - fail fast. The entire point of the 
breaker is to shed
+            // load when the backend is known-bad; retrying (with or without 
backoff) would
+            // defeat that protection and keep hammering an unhealthy server. 
We still honour the
+            // default-value fallback so callers that opted into graceful 
degradation see a
+            // fallback row instead of a propagated exception.
+            //
+            // Breaker accounting rule: "one logical request counts once". The 
first physical
+            // attempt (attemptNumber == 0) never touches the backend when the 
breaker is
+            // already OPEN — it short-circuits here, so recording a failure 
would double-count
+            // against the next real failure and also let a 
healthy-but-shedding breaker
+            // accelerate its own OPEN decisions. However, if we reach here on 
a retry
+            // (attemptNumber > 0), the *original* attempt did produce a 
genuine backend
+            // failure that triggered the retry path; that failure was 
intentionally deferred
+            // to the "retries exhausted" branch of handleFailureWithRetry so 
an eventual
+            // success would not over-count transient blips. Because the 
breaker has since
+            // tripped (likely via concurrent requests), there will be no 
"retries exhausted"
+            // call for this logical request, and without recording here the 
original backend
+            // failure would be silently dropped from breaker statistics.
+            if (attemptNumber > 0) {
+                recordFailure();
+            }
+            LOG.debug(
+                    "Circuit breaker OPEN; skipping retry and routing to 
default-value fallback (if configured)");
+            completeWithDefaultValueOrFail(future, e);
+        } catch (JsonProcessingException | IllegalArgumentException e) {
+            // Deterministic client-side bug (bad input shape, unsupported 
compression algo, etc.).
+            // Not retryable; a standalone client bug should not count against 
the breaker. On a
+            // retry path (attemptNumber > 0) we must still flush the prior 
deferred 5xx/network
+            // failure — see shouldRecordBreakerFailureOnFinalCompletion.
             LOG.error("Failed to build Triton inference request", e);
-            future.completeExceptionally(e);
+            if (attemptNumber > 0) {
+                recordFailure();
+            }
+            completeWithDefaultValueOrFail(future, e);
+        } catch (Exception e) {
+            // Unexpected failure during request dispatch (e.g. dispatcher 
shutdown race). Treat
+            // as a transient infrastructure error: retry if budget remains, 
but do NOT count
+            // against the breaker (the backend never saw this request).
+            LOG.error("Unexpected error dispatching Triton request", e);
+            handleFailureWithRetry(
+                    rowData,
+                    future,
+                    attemptNumber,
+                    e,
+                    /* countAsBreakerFailure */ false,
+                    /* retryable */ true);
         }
+    }
 
-        return future;
+    /**
+     * Handles request failure with retry logic or default value fallback.
+     *
+     * <p>Retry is only attempted for transient failures; circuit-breaker-open 
and unrecoverable
+     * errors are short-circuited by the caller. The circuit breaker is 
updated at most once per
+     * logical request (here, on final exhaustion) and only when the caller 
asserts that the failure
+     * is a backend-health signal — deterministic client-side bugs (JSON parse 
errors, malformed
+     * input) must not poison the breaker for an otherwise-healthy server.
+     *
+     * @param rowData Input data for inference
+     * @param future The future to complete
+     * @param attemptNumber Current attempt number (0-indexed)
+     * @param error The error that caused the failure
+     * @param countAsBreakerFailure Whether this failure should be counted 
against the circuit
+     *     breaker when retries are exhausted. {@code true} for 5xx / network 
errors; {@code false}
+     *     for 4xx / parse / deterministic client bugs.
+     * @param retryable Whether retry should even be attempted. {@code false} 
short-circuits to the
+     *     fallback/fail path immediately (e.g. deterministic client bugs).
+     */
+    private void handleFailureWithRetry(
+            RowData rowData,
+            CompletableFuture<Collection<RowData>> future,
+            int attemptNumber,
+            Throwable error,
+            boolean countAsBreakerFailure,
+            boolean retryable) {
+
+        if (retryable && attemptNumber < getMaxRetries()) {
+            long delayMs = computeBackoffDelayMillis(attemptNumber);
+
+            // +2 = (0-indexed attemptNumber + 1 for the next attempt) + (1 to 
make 1-based for
+            // human-readable logs). Total attempts is also 1-based (N retries 
=> N+1 attempts).
+            int nextAttemptOneBased = attemptNumber + 2;
+            int totalAttempts = getMaxRetries() + 1;
+            LOG.info(
+                    "Retrying Triton inference request (attempt {}/{}) after 
{} ms",
+                    nextAttemptOneBased,
+                    totalAttempts,
+                    delayMs);
+
+            // Schedule retry on the function-owned scheduler so that 
cancellation on close()
+            // takes effect and we never fire an HTTP call against a released 
client.
+            if (retryScheduler == null || retryScheduler.isShutdown()) {
+                // Should not happen while the operator is open with 
maxRetries > 0, but guard
+                // defensively against race with close().
+                LOG.warn("Retry scheduler unavailable; failing request without 
further retries");
+                // Same accounting rule as the "retries exhausted" branch 
below: record once
+                // when the current failure is breaker-worthy OR when a prior 
attempt
+                // deferred its accounting here.
+                if (shouldRecordBreakerFailureOnFinalCompletion(
+                        countAsBreakerFailure, attemptNumber)) {
+                    recordFailure();
+                }
+                completeWithDefaultValueOrFail(future, error);
+                return;
+            }
+            try {
+                retryScheduler.schedule(
+                        () -> asyncPredictWithRetry(rowData, future, 
attemptNumber + 1),
+                        delayMs,
+                        TimeUnit.MILLISECONDS);
+            } catch (RejectedExecutionException rejected) {
+                // TOCTOU: scheduler was alive when we checked above but got 
shutdown between
+                // isShutdown() and schedule() (close() racing against an 
in-flight callback).
+                // Propagating would leave the AsyncIO future uncompleted and 
stall the operator,
+                // so we must complete synchronously here.
+                LOG.warn(
+                        "Retry scheduler rejected task (operator closing?); 
failing request without further retries",
+                        rejected);
+                if (shouldRecordBreakerFailureOnFinalCompletion(
+                        countAsBreakerFailure, attemptNumber)) {
+                    recordFailure();
+                }
+                completeWithDefaultValueOrFail(future, error);
+            }
+        } else {
+            // No more retries (exhausted or not retryable). See
+            // shouldRecordBreakerFailureOnFinalCompletion for the accounting 
rule.
+            if 
(shouldRecordBreakerFailureOnFinalCompletion(countAsBreakerFailure, 
attemptNumber)) {
+                recordFailure();
+            }
+
+            // attemptNumber + 1 is the accurate attempt count: 1 for a 
non-retryable failure
+            // at attempt 0, getMaxRetries()+1 when retries were exhausted.
+            int attemptsMade = attemptNumber + 1;
+            if (defaultValueConfigured) {
+                LOG.warn(
+                        "Triton inference failed after {} attempt(s). 
Returning configured default value. Original error: {}",
+                        attemptsMade,
+                        error.getMessage(),
+                        error);
+            } else {
+                LOG.error(
+                        "Triton inference failed after {} attempt(s). No 
default value configured.",
+                        attemptsMade);
+            }
+            completeWithDefaultValueOrFail(future, error);
+        }
     }
 
     /**
-     * Handles HTTP error responses and creates appropriate typed exceptions.
+     * Whether the final-completion site should invoke {@link 
#recordFailure()} for this logical
+     * request.
+     *
+     * <p>Record when either the current-attempt failure is breaker-worthy 
({@code
+     * countAsBreakerFailure}) or we are on a retry path ({@code attemptNumber 
> 0}), since only
+     * breaker-worthy failures enter the retry path — so {@code attemptNumber 
> 0} implies a prior
+     * deferred breaker-worthy failure still needs accounting.
+     *
+     * <p>Without the {@code attemptNumber > 0} clause, a 5xx on attempt 0 
followed by a
+     * non-retryable failure (4xx / JSON parse / request-build bug) on a retry 
would silently drop
+     * the original 5xx from breaker statistics — the same class of bug 
previously fixed for the
+     * {@link TritonCircuitBreakerOpenException} short-circuit.
      *
-     * @param response The HTTP response with error status
-     * @param future The future to complete exceptionally
-     * @throws IOException If reading response body fails
+     * <p>Package-private + static for direct unit-test coverage of the 
decision rule.
      */
-    private void handleErrorResponse(
-            Response response, CompletableFuture<Collection<RowData>> future) 
throws IOException {
+    static boolean shouldRecordBreakerFailureOnFinalCompletion(
+            boolean countAsBreakerFailure, int attemptNumber) {
+        return countAsBreakerFailure || attemptNumber > 0;
+    }
+
+    /**
+     * Computes the exponential-backoff delay for the given attempt, clamped 
to {@code
+     * retry-max-backoff} to prevent overflow and unreasonably long sleeps 
when {@code max-retries}
+     * is large, and randomized with equal jitter to avoid a thundering herd 
of concurrent retries.
+     *
+     * <p>The nominal delay is {@code base * 2^attempt}, clamped to {@code 
capMs}. With equal
+     * jitter, the returned delay is drawn uniformly from {@code [nominal/2, 
nominal]}: we keep half
+     * of the backoff as a deterministic floor (so the server still gets the 
intended breathing room
+     * before the next burst) and randomize the other half to spread 
concurrent retries across a
+     * time window. Without jitter a cluster of N parallel AsyncIO slots 
failing on the same Triton
+     * outage would all retry at the exact same instant, producing a 
synchronized thundering herd
+     * that defeats the purpose of exponential backoff.
+     *
+     * <p>Package-private + static so that the deterministic portion of the 
algorithm can be
+     * exercised in isolation without standing up a full {@link 
TritonInferenceModelFunction}
+     * fixture. See {@link #computeBackoffWithJitter(long)} for the jitter 
wrapper.
+     *
+     * <p><b>Preconditions (enforced by callers):</b> {@code attemptNumber >= 
0}, {@code baseMs >
+     * 0}, {@code capMs >= baseMs}. These are validated in {@link 
AbstractTritonModelFunction#open}
+     * via {@link org.apache.flink.util.Preconditions}, so direct callers of 
this static helper
+     * (including tests) must uphold the same contract — passing {@code baseMs 
== 0} returns {@code
+     * 0}, but that is a degenerate configuration that the option-validation 
layer rejects.
+     */
+    static long computeBackoffDelayMillis(int attemptNumber, long baseMs, long 
capMs) {
+        // Cap the shift amount defensively. `1L << 62` already exceeds any 
realistic base*cap
+        // product; beyond 62 the Java spec masks the shift amount (mod 64), 
which would silently
+        // wrap and produce bogus delays. Clamp before the shift so the math 
is always monotone.
+        int shift = Math.max(0, Math.min(attemptNumber, 30));
+        long multiplier = 1L << shift;
+
+        // Guard against base * multiplier overflow: if it would overflow, use 
the cap directly.
+        long delay;
+        if (baseMs > 0 && multiplier > Long.MAX_VALUE / baseMs) {
+            delay = capMs;
+        } else {
+            delay = baseMs * multiplier;
+        }
+        return Math.min(delay, capMs);
+    }
+
+    /**
+     * Applies equal jitter to an already-computed deterministic backoff: 
returns a value drawn
+     * uniformly from {@code [nominalDelayMs / 2, nominalDelayMs]}. See {@link
+     * #computeBackoffDelayMillis(int, long, long)} for the rationale.
+     *
+     * <p>Kept separate (and package-private) from the deterministic helper so 
that existing unit
+     * tests can continue to assert exact values for the backoff curve, while 
the randomized
+     * integration still exercises jitter in production paths.
+     *
+     * <p>Uses {@link ThreadLocalRandom} to avoid contention when many AsyncIO 
slots retry in
+     * parallel — a shared {@link java.util.Random} would serialize on its 
internal seed field and
+     * itself become a coordination point under exactly the high-concurrency 
conditions that make
+     * jitter necessary.
+     */
+    static long computeBackoffWithJitter(long nominalDelayMs) {
+        if (nominalDelayMs <= 1L) {
+            // A delay of 0 or 1 ms is already so small that jitter would be 
meaningless and
+            // could introduce negative / zero delays via truncation. Pass 
through.
+            return nominalDelayMs;
+        }
+        long halfDelay = nominalDelayMs / 2L;
+        // nextLong(origin, bound) requires origin < bound; halfDelay >= 1 and 
nominalDelayMs + 1
+        // is strictly greater, so this is safe.
+        return ThreadLocalRandom.current().nextLong(halfDelay, nominalDelayMs 
+ 1L);
+    }
+
+    private long computeBackoffDelayMillis(int attemptNumber) {
+        long nominal =
+                computeBackoffDelayMillis(
+                        attemptNumber,
+                        getRetryInitialBackoff().toMillis(),
+                        getRetryMaxBackoff().toMillis());
+        return computeBackoffWithJitter(nominal);
+    }
+
+    /**
+     * Completes the future with a freshly-built default-value result when 
{@code default-value} is
+     * configured, or with the supplied error otherwise. Centralises the 
"either / or" decision so
+     * that both the retry-exhausted path and the non-retryable (4xx, 
circuit-breaker-open) path
+     * share one implementation.
+     *
+     * <p>A new {@link GenericRowData} / singleton {@link Collection} is built 
per invocation — see
+     * {@link #cachedDefaultPayload} for why sharing a single pre-built 
collection across all
+     * fallback emissions would be unsafe.
+     */
+    private void completeWithDefaultValueOrFail(
+            CompletableFuture<Collection<RowData>> future, Throwable error) {
+        if (defaultValueConfigured) {
+            future.complete(buildDefaultResult(cachedDefaultPayload));
+        } else {
+            future.completeExceptionally(error);
+        }
+    }
+
+    /**
+     * Builds a fresh singleton {@link Collection} wrapping a fresh {@link 
GenericRowData} around
+     * the given payload. Exposed as package-private + static so that the 
"every fallback emits a
+     * distinct instance" invariant can be asserted in isolation — see {@link 
#cachedDefaultPayload}
+     * for why sharing a single pre-built collection across all fallback 
emissions would be unsafe.
+     */
+    static Collection<RowData> buildDefaultResult(Object cachedPayload) {
+        return Collections.singletonList(GenericRowData.of(cachedPayload));
+    }
+
+    /**
+     * Handles HTTP error response with retry logic.
+     *
+     * @param response The HTTP response
+     * @param rowData Input data for inference
+     * @param future The future to complete
+     * @param attemptNumber Current attempt number
+     */
+    private void handleErrorResponseWithRetry(
+            Response response,
+            RowData rowData,
+            CompletableFuture<Collection<RowData>> future,
+            int attemptNumber) {
 
-        String errorBody =
-                response.body() != null ? response.body().string() : "No error 
details provided";
         int statusCode = response.code();
 
+        // Read the error body in its own try/catch so a body-read failure 
does NOT lose the
+        // original HTTP status. Without this guard, an IOException from 
response.body().string()
+        // would propagate out of onResponse() into the catch (Exception e) 
handler, which routes
+        // failures as non-retryable / non-breaker — causing a genuine 5xx 
(which is retryable
+        // and a real backend-health signal) to be silently downgraded.
+        String errorBody;
+        try {
+            errorBody =
+                    response.body() != null
+                            ? response.body().string()
+                            : "No error details provided";
+        } catch (IOException bodyReadFailure) {
+            LOG.warn(
+                    "Failed to read Triton error response body for HTTP {} 
(attempt {}/{}); "
+                            + "preserving status code for routing decision",
+                    statusCode,
+                    attemptNumber + 1,
+                    getMaxRetries() + 1,
+                    bodyReadFailure);
+            errorBody = "<error body unreadable: " + 
bodyReadFailure.getMessage() + ">";
+        }
+
+        LOG.error(
+                "Triton inference failed (attempt {}/{}) with HTTP {}: {}",
+                attemptNumber + 1,
+                getMaxRetries() + 1,
+                statusCode,
+                errorBody);
+
         // Build detailed error message with context
         StringBuilder errorMsg = new StringBuilder();
         errorMsg.append(
@@ -328,72 +785,156 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
                 errorBody.toLowerCase().contains("shape")
                         || errorBody.toLowerCase().contains("dimension");
 
+        TritonException exception;
+
         if (statusCode >= 400 && statusCode < 500) {
             // Client error - user configuration issue
             errorMsg.append("\n=== Troubleshooting (Client Error) ===\n");
 
             if (statusCode == 400) {
-                errorMsg.append("  • Verify input shape matches model's 
config.pbtxt\n");
-                errorMsg.append("  • For scalar: use 
INT/FLOAT/DOUBLE/STRING\n");
-                errorMsg.append("  • For 1-D tensor: use ARRAY<type>\n");
+                errorMsg.append("  - Verify input shape matches model's 
config.pbtxt\n");
+                errorMsg.append("  - For scalar: use 
INT/FLOAT/DOUBLE/STRING\n");
+                errorMsg.append("  - For 1-D tensor: use ARRAY<type>\n");
                 errorMsg.append(
-                        "  • Try flatten-batch-dim=true if model expects [N] 
but gets [1,N]\n");
+                        "  - Try flatten-batch-dim=true if model expects [N] 
but gets [1,N]\n");
 
                 if (isShapeMismatch) {
-                    // Create schema exception for shape mismatches
-                    future.completeExceptionally(
+                    exception =
                             new TritonSchemaException(
                                     errorMsg.toString(),
                                     "See Triton model config.pbtxt",
-                                    String.format("Flink type: %s", 
inputType)));
-                    return;
+                                    String.format("Flink type: %s", 
inputType));
+                } else {
+                    exception = new TritonClientException(errorMsg.toString(), 
statusCode);
                 }
             } else if (statusCode == 404) {
-                errorMsg.append("  • Verify model-name: 
").append(getModelName()).append("\n");
-                errorMsg.append("  • Verify model-version: ")
+                errorMsg.append("  - Verify model-name: 
").append(getModelName()).append("\n");
+                errorMsg.append("  - Verify model-version: ")
                         .append(getModelVersion())
                         .append("\n");
-                errorMsg.append("  • Check model is loaded: GET ")
+                errorMsg.append("  - Check model is loaded: GET ")
                         .append(sanitizedEndpoint)
                         .append("\n");
+                exception = new TritonClientException(errorMsg.toString(), 
statusCode);
             } else if (statusCode == 401 || statusCode == 403) {
-                errorMsg.append("  • Check auth-token configuration\n");
-                errorMsg.append("  • Verify server authentication 
requirements\n");
+                errorMsg.append("  - Check auth-token configuration\n");
+                errorMsg.append("  - Verify server authentication 
requirements\n");
+                exception = new TritonClientException(errorMsg.toString(), 
statusCode);
+            } else {
+                exception = new TritonClientException(errorMsg.toString(), 
statusCode);
             }
 
-            future.completeExceptionally(
-                    new TritonClientException(errorMsg.toString(), 
statusCode));
+            // 4xx is a persistent client-side configuration problem: no 
retry, and a
+            // standalone 4xx is not fed to the breaker (would force-open on 
user misconfig
+            // against an otherwise-healthy server). On a retry path 
(attemptNumber > 0) the
+            // prior 5xx/network failure was deferred; flush it once here — 
same rule as
+            // shouldRecordBreakerFailureOnFinalCompletion. Bypasses 
handleFailureWithRetry
+            // because there's nothing to retry.
+            if (attemptNumber > 0) {
+                recordFailure();
+            }
+            if (defaultValueConfigured) {
+                LOG.warn(
+                        "Client error (HTTP {}). Returning default value. 
Original error: {}",
+                        statusCode,
+                        exception.getMessage(),
+                        exception);
+            } else {
+                // Make the "not retrying, no fallback, propagating exception" 
decision
+                // explicit in the operator log so that operators chasing a 
misconfiguration
+                // don't have to infer it from the absence of a retry log line.
+                LOG.error(
+                        "Client error (HTTP {}) is non-retryable and no 
default-value is configured; "
+                                + "propagating exception to AsyncIO",
+                        statusCode);
+            }
+            completeWithDefaultValueOrFail(future, exception);
 
         } else if (statusCode >= 500 && statusCode < 600) {
-            // Server error - Triton service issue
+            // Server error - Triton service issue - retryable
             errorMsg.append("\n=== Troubleshooting (Server Error) ===\n");
 
             if (statusCode == 500) {
-                errorMsg.append("  • Check Triton server logs for inference 
crash details\n");
-                errorMsg.append("  • Model may have run out of memory\n");
-                errorMsg.append("  • Input data may trigger model bug\n");
+                errorMsg.append("  - Check Triton server logs for inference 
crash details\n");
+                errorMsg.append("  - Model may have run out of memory\n");
+                errorMsg.append("  - Input data may trigger model bug\n");
             } else if (statusCode == 503) {
-                errorMsg.append("  • Server is overloaded or unavailable\n");
-                errorMsg.append("  • This error is retryable with backoff\n");
-                errorMsg.append("  • Consider scaling Triton server 
resources\n");
+                errorMsg.append("  - Server is overloaded or unavailable\n");
+                errorMsg.append("  - This error is retryable with backoff\n");
+                errorMsg.append("  - Consider scaling Triton server 
resources\n");
             } else if (statusCode == 504) {
-                errorMsg.append("  • Inference exceeded gateway timeout\n");
-                errorMsg.append("  • This error is retryable\n");
-                errorMsg.append("  • Consider increasing timeout 
configuration\n");
+                errorMsg.append("  - Inference exceeded gateway timeout\n");
+                errorMsg.append("  - This error is retryable\n");
+                errorMsg.append("  - Consider increasing timeout 
configuration\n");
             }
 
-            future.completeExceptionally(
-                    new TritonServerException(errorMsg.toString(), 
statusCode));
+            exception = new TritonServerException(errorMsg.toString(), 
statusCode);
+            // 5xx is a genuine backend-health signal: count against the 
breaker and retry.
+            handleFailureWithRetry(
+                    rowData,
+                    future,
+                    attemptNumber,
+                    exception,
+                    /* countAsBreakerFailure */ true,
+                    /* retryable */ true);
 
         } else {
             // Unexpected status code
             errorMsg.append("\n=== Unexpected Status Code ===\n");
-            errorMsg.append("  • This status code is not standard for 
Triton\n");
-            errorMsg.append("  • Check if proxy/load balancer is involved\n");
+            errorMsg.append("  - This status code is not standard for 
Triton\n");
+            errorMsg.append("  - Check if proxy/load balancer is involved\n");
+
+            exception = new TritonClientException(errorMsg.toString(), 
statusCode);
+            // Non-standard status - could be a misbehaving proxy. Retry 
transparently but do
+            // not conflate with backend health (the real Triton server never 
produced it).
+            handleFailureWithRetry(
+                    rowData,
+                    future,
+                    attemptNumber,
+                    exception,
+                    /* countAsBreakerFailure */ false,
+                    /* retryable */ true);
+        }
+    }
 
-            future.completeExceptionally(
-                    new TritonClientException(errorMsg.toString(), 
statusCode));
+    /**
+     * Parses the configured default-value string into the single field 
payload that will be wrapped
+     * into a fresh {@link GenericRowData} on every fallback emission.
+     *
+     * <p>Called once in {@link #open(FunctionContext)} so that a malformed 
default-value fails the
+     * job at startup rather than masking the original inference error at 
runtime, and so that the
+     * hot fallback path performs no JSON parsing.
+     *
+     * <p>We return a single payload object (not a {@code 
Collection<RowData>}) because sharing a
+     * pre-built {@link RowData} across every failing record would alias 
mutable row state across
+     * emitted rows — see the field-level Javadoc on {@link 
#cachedDefaultPayload} for details.
+     *
+     * @param defaultValueStr The default-value string to parse
+     * @return The deserialized payload for the single output column (may be 
{@code null} when the
+     *     user configures the literal string {@code 'null'} to request a 
SQL-NULL fallback)
+     * @throws JsonProcessingException If parsing the JSON default value fails
+     */
+    private Object parseDefaultPayload(String defaultValueStr) throws 
JsonProcessingException {
+        // Accept the literal string 'null' as an explicit SQL-NULL fallback 
for every output
+        // type. Users who leave the option unset disable the fallback 
entirely (exceptions are
+        // propagated), so we need an affirmative way to say "fall back to 
NULL". Handling it
+        // uniformly here avoids the surprising difference between STRING 
(explicitly special-
+        // cased) and ARRAY/numeric (where `objectMapper.readTree("null")` 
returned a NullNode
+        // whose downstream mapping was implicit and undocumented).
+        //
+        // NOTE: As a consequence, a VARCHAR model cannot use the literal 
string "null" (lower-
+        // case) as a non-null failure sentinel - it will always be 
interpreted as SQL NULL.
+        // Users who need that exact sentinel should choose a different marker 
(e.g. "NULL",
+        // "FAILED", "<null>"); this trade-off is documented in 
TritonOptions.DEFAULT_VALUE.
+        if ("null".equals(defaultValueStr)) {
+            return null;
+        }
+        if (outputType instanceof VarCharType) {
+            return BinaryStringData.fromString(defaultValueStr);
         }
+        // Array and other scalar types - parse JSON.
+        JsonNode jsonNode = objectMapper.readTree(defaultValueStr);
+        return TritonTypeMapper.deserializeFromJson(jsonNode, outputType);
     }
 
     private String buildInferenceRequest(RowData rowData) throws 
JsonProcessingException {
@@ -511,8 +1052,21 @@ public class TritonInferenceModelFunction extends 
AbstractTritonModelFunction {
             LOG.warn("No outputs found in Triton response");
         }
 
-        // If no outputs found, return default value based on type
+        // If no outputs were produced, prefer the user-configured default 
value when available so
+        // that downstream operators see the same fallback payload as on 
retry-exhausted and
+        // breaker-open paths. Only when no default-value is configured do we 
fall back to the
+        // previous "type-specific empty sentinel" behaviour — but in that 
case we also warn,
+        // because an empty / null row can easily masquerade as a successful 
prediction and
+        // propagate silently through aggregations.
         if (results.isEmpty()) {
+            if (defaultValueConfigured) {
+                LOG.warn("Triton response contained no outputs; emitting 
configured default value");
+                return buildDefaultResult(cachedDefaultPayload);
+            }
+            LOG.warn(
+                    "Triton response contained no outputs and no default-value 
is configured; "
+                            + "emitting a type-specific empty sentinel. 
Configure default-value "
+                            + "to get a more explicit fallback.");
             Object defaultValue;
             if (outputType instanceof VarCharType) {
                 defaultValue = BinaryStringData.EMPTY_UTF8;
diff --git 
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonModelProviderFactory.java
 
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonModelProviderFactory.java
index 516ec899b76..4fd6139a245 100644
--- 
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonModelProviderFactory.java
+++ 
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonModelProviderFactory.java
@@ -70,6 +70,11 @@ public class TritonModelProviderFactory implements 
ModelProviderFactory {
         set.add(TritonOptions.COMPRESSION);
         set.add(TritonOptions.AUTH_TOKEN);
         set.add(TritonOptions.CUSTOM_HEADERS);
+        // Retry and default-value fallback
+        set.add(TritonOptions.MAX_RETRIES);
+        set.add(TritonOptions.RETRY_INITIAL_BACKOFF);
+        set.add(TritonOptions.RETRY_MAX_BACKOFF);
+        set.add(TritonOptions.DEFAULT_VALUE);
         return set;
     }
 
diff --git 
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
 
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
index f28a27a8b8b..d8831574c73 100644
--- 
a/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
+++ 
b/flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonOptions.java
@@ -253,4 +253,89 @@ public class TritonOptions {
                             "Number of successful test requests required in 
HALF_OPEN state to close the circuit. "
                                     + "If any request fails in HALF_OPEN 
state, the circuit immediately reopens. "
                                     + "Defaults to 3 requests. Only effective 
when circuit-breaker-enabled is true.");
+
+    // ==================== Retry and Default Value Fallback Options 
====================
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Integer> MAX_RETRIES =
+            ConfigOptions.key("max-retries")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "Maximum number of retries (additional attempts 
beyond the first) "
+                                    + "for failed inference requests. With 
max-retries=2 the "
+                                    + "request will be attempted up to 3 times 
in total "
+                                    + "(1 initial attempt + 2 retries). When 
set to 0 (default), "
+                                    + "no retry is performed. Only transient 
failures are "
+                                    + "retried: network errors and 5xx 
responses. Client-side "
+                                    + "4xx errors, response parsing failures, 
and circuit "
+                                    + "breaker OPEN failures are never retried 
because they "
+                                    + "indicate a persistent condition. Must 
be >= 0.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Duration> RETRY_INITIAL_BACKOFF =
+            ConfigOptions.key("retry-initial-backoff")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(100))
+                    .withDescription(
+                            "Initial backoff duration between retry attempts. 
Uses exponential "
+                                    + "backoff with equal jitter: the nominal 
delay is "
+                                    + "initial-backoff * 2^attempt (first 
retry waits this "
+                                    + "duration, second retry waits 2x, third 
waits 4x, and so "
+                                    + "on), clamped to retry-max-backoff, then 
randomized in "
+                                    + "the range [delay/2, delay] to prevent a 
thundering herd "
+                                    + "of concurrent retries hitting the 
server at the exact "
+                                    + "same instant. Defaults to 100ms. Must 
be > 0.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<Duration> RETRY_MAX_BACKOFF =
+            ConfigOptions.key("retry-max-backoff")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "Upper bound on the delay between retry attempts. 
Exponential "
+                                    + "backoff computed from 
retry-initial-backoff is clamped "
+                                    + "to this value so that a misconfigured 
max-retries "
+                                    + "cannot produce hours-long sleeps or 
overflow the delay "
+                                    + "computation. Defaults to 30s. Must be 
>= "
+                                    + "retry-initial-backoff.");
+
+    @Documentation.Section({Documentation.Sections.MODEL_TRITON_ADVANCED})
+    public static final ConfigOption<String> DEFAULT_VALUE =
+            ConfigOptions.key("default-value")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Fallback value to return when all 
retry attempts "
+                                                    + "fail (transient errors) 
or when the request "
+                                                    + "fails with a 
non-retryable error (4xx). "
+                                                    + "This allows downstream 
processing to "
+                                                    + "distinguish between 
successful and failed "
+                                                    + "predictions without 
propagating exceptions. "
+                                                    + "Format depends on 
output type: for STRING "
+                                                    + "use plain text (e.g. 
%s); for numeric types "
+                                                    + "use string 
representation (e.g. %s); for "
+                                                    + "ARRAY types use JSON 
array format (e.g. %s); "
+                                                    + "for SQL NULL use the 
literal %s. "
+                                                    + "Note: the lower-case 
literal %s is ALWAYS "
+                                                    + "interpreted as SQL NULL 
and cannot be used "
+                                                    + "as a STRING sentinel; 
if you need a "
+                                                    + "string-typed sentinel 
indicating failure, "
+                                                    + "use %s, %s or %s 
instead. The value "
+                                                    + "is parsed once at 
operator initialization; "
+                                                    + "an unparseable value 
fails the job at "
+                                                    + "startup rather than at 
the first error. "
+                                                    + "If not specified, 
exceptions are thrown on "
+                                                    + "failure.",
+                                            code("'FAILED'"),
+                                            code("'-1'"),
+                                            code("'[0.0, 0.0]'"),
+                                            code("'null'"),
+                                            code("'null'"),
+                                            code("'NULL'"),
+                                            code("'FAILED'"),
+                                            code("'<null>'"))
+                                    .build());
 }
diff --git 
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonBreakerFailureAccountingTest.java
 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonBreakerFailureAccountingTest.java
new file mode 100644
index 00000000000..722644bfd66
--- /dev/null
+++ 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonBreakerFailureAccountingTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link
+ * 
TritonInferenceModelFunction#shouldRecordBreakerFailureOnFinalCompletion(boolean,
 int)}.
+ *
+ * <p>Guards the "one logical request, one breaker entry" contract: a 
breaker-worthy failure
+ * deferred on attempt 0 must be flushed even when the retry lands on a 
non-breaker-worthy branch
+ * (4xx / parse / request-build bug).
+ */
+class TritonBreakerFailureAccountingTest {
+
+    @Test
+    void testStandaloneBreakerWorthyFailureIsRecorded() {
+        // 5xx / network on first attempt with no retries: record directly.
+        assertThat(
+                        
TritonInferenceModelFunction.shouldRecordBreakerFailureOnFinalCompletion(
+                                true, 0))
+                .isTrue();
+    }
+
+    @Test
+    void testStandaloneNonBreakerFailureIsNotRecorded() {
+        // 4xx / parse error on attempt 0: not a backend-health signal.
+        assertThat(
+                        
TritonInferenceModelFunction.shouldRecordBreakerFailureOnFinalCompletion(
+                                false, 0))
+                .isFalse();
+    }
+
+    @Test
+    void testExhaustedRetriesStillRecord() {
+        // Multiple retries all breaker-worthy: record once.
+        assertThat(
+                        
TritonInferenceModelFunction.shouldRecordBreakerFailureOnFinalCompletion(
+                                true, 3))
+                .isTrue();
+    }
+
+    @Test
+    void testRetryLandingOnNonRetryableStillRecordsDeferred() {
+        // Regression for the bug this fix addresses: 5xx on attempt 0 → retry 
→ attempt 1
+        // fails with 4xx / parse error. countAsBreakerFailure=false for the 
final attempt, but
+        // the prior 5xx was deferred and must still be recorded.
+        assertThat(
+                        
TritonInferenceModelFunction.shouldRecordBreakerFailureOnFinalCompletion(
+                                false, 1))
+                .isTrue();
+    }
+}
diff --git 
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonDefaultValueFallbackTest.java
 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonDefaultValueFallbackTest.java
new file mode 100644
index 00000000000..1befc12c743
--- /dev/null
+++ 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonDefaultValueFallbackTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link 
TritonInferenceModelFunction#buildDefaultResult(Object)}.
+ *
+ * <p>Regression guard for a subtle correctness bug: a previous implementation 
cached a single
+ * {@code Collection<RowData>} (wrapping a single {@code GenericRowData}) and 
handed that same
+ * instance to every failing record. Because downstream operators may retain 
references to the
+ * emitted rows (keyed aggregations, changelog collectors, serializers that 
reuse field storage),
+ * aliasing a single row across every fallback emission silently violates the 
"each emitted row is
+ * independent" contract.
+ */
+class TritonDefaultValueFallbackTest {
+
+    @Test
+    void testEachCallReturnsFreshCollection() {
+        // Aliasing the same Collection across fallbacks would let a 
downstream operator that
+        // (legitimately) mutates its input collection (e.g. addAll / clear on 
the list it
+        // receives) corrupt future fallback emissions. Ensure every call 
produces a distinct
+        // container.
+        Object payload = BinaryStringData.fromString("FAILED");
+        Collection<RowData> first = 
TritonInferenceModelFunction.buildDefaultResult(payload);
+        Collection<RowData> second = 
TritonInferenceModelFunction.buildDefaultResult(payload);
+
+        assertThat(first).isNotSameAs(second);
+    }
+
+    @Test
+    void testEachCallReturnsFreshRow() {
+        // GenericRowData carries mutable RowKind plus a mutable field array; 
aliasing a single
+        // row into every emitted record would leak RowKind changes 
(insert/update/delete) and
+        // any per-record field overwrites back into the cached fallback, 
poisoning subsequent
+        // emissions. Assert that the row instance itself is distinct per call.
+        Object payload = BinaryStringData.fromString("FAILED");
+        RowData first = 
firstRow(TritonInferenceModelFunction.buildDefaultResult(payload));
+        RowData second = 
firstRow(TritonInferenceModelFunction.buildDefaultResult(payload));
+
+        assertThat(first).isNotSameAs(second);
+    }
+
+    @Test
+    void testNullPayloadProducesSqlNullField() {
+        // When the user configures `default-value='null'`, 
parseDefaultPayload returns a Java
+        // null. Wrapping that in GenericRowData must produce a single-column 
row whose only
+        // field reads back as SQL NULL - the "no fallback configured" 
semantics must be carried
+        // by defaultValueConfigured, not by the payload value itself.
+        Collection<RowData> result = 
TritonInferenceModelFunction.buildDefaultResult(null);
+        RowData row = firstRow(result);
+
+        assertThat(row.getArity()).isEqualTo(1);
+        assertThat(row.isNullAt(0)).isTrue();
+    }
+
+    @Test
+    void testDownstreamMutationOfPriorResultDoesNotAffectNextResult() {
+        // End-to-end assertion of the non-aliasing invariant: mutating the 
row emitted by call
+        // N (here by overwriting its field and flipping RowKind) must leave 
call N+1 pristine.
+        // Triton's fallback contract is "each record sees a clean default 
row".
+        Object payload = BinaryStringData.fromString("FAILED");
+        Collection<RowData> first = 
TritonInferenceModelFunction.buildDefaultResult(payload);
+        GenericRowData firstRow = (GenericRowData) firstRow(first);
+        firstRow.setField(0, BinaryStringData.fromString("HIJACKED"));
+        firstRow.setRowKind(org.apache.flink.types.RowKind.DELETE);
+
+        RowData second = 
firstRow(TritonInferenceModelFunction.buildDefaultResult(payload));
+        assertThat(second.getString(0).toString()).isEqualTo("FAILED");
+        
assertThat(second.getRowKind()).isEqualTo(org.apache.flink.types.RowKind.INSERT);
+    }
+
+    private static RowData firstRow(Collection<RowData> collection) {
+        assertThat(collection).hasSize(1);
+        Iterator<RowData> it = collection.iterator();
+        return it.next();
+    }
+}
diff --git 
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonModelProviderFactoryTest.java
 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonModelProviderFactoryTest.java
index 28f98fd0308..7fab9ce7446 100644
--- 
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonModelProviderFactoryTest.java
+++ 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonModelProviderFactoryTest.java
@@ -42,7 +42,7 @@ class TritonModelProviderFactoryTest {
     void testOptionalOptions() {
         TritonModelProviderFactory factory = new TritonModelProviderFactory();
         assertThat(factory.optionalOptions())
-                .hasSize(10)
+                .hasSize(14)
                 .containsExactlyInAnyOrder(
                         TritonOptions.MODEL_VERSION,
                         TritonOptions.TIMEOUT,
@@ -53,6 +53,10 @@ class TritonModelProviderFactoryTest {
                         TritonOptions.SEQUENCE_END,
                         TritonOptions.COMPRESSION,
                         TritonOptions.AUTH_TOKEN,
-                        TritonOptions.CUSTOM_HEADERS);
+                        TritonOptions.CUSTOM_HEADERS,
+                        TritonOptions.MAX_RETRIES,
+                        TritonOptions.RETRY_INITIAL_BACKOFF,
+                        TritonOptions.RETRY_MAX_BACKOFF,
+                        TritonOptions.DEFAULT_VALUE);
     }
 }
diff --git 
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryBackoffTest.java
 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryBackoffTest.java
new file mode 100644
index 00000000000..f5f5dd7339d
--- /dev/null
+++ 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryBackoffTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link 
TritonInferenceModelFunction#computeBackoffDelayMillis(int, long, long)}.
+ *
+ * <p>The retry backoff algorithm has several boundary conditions that are 
easy to regress on (shift
+ * overflow when attempt is very large, multiplication overflow when base*2^n 
wraps negative, cap
+ * monotonicity). Exercising it here without the full operator fixture keeps 
the tests fast and the
+ * failure modes sharply diagnosable.
+ */
+class TritonRetryBackoffTest {
+
+    @Test
+    void testFirstAttemptReturnsBase() {
+        // attempt 0 corresponds to "first retry waits base" in the docstring.
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(0, 
100L, 30_000L))
+                .isEqualTo(100L);
+    }
+
+    @Test
+    void testExponentialDoubling() {
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(1, 
100L, 30_000L))
+                .isEqualTo(200L);
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(2, 
100L, 30_000L))
+                .isEqualTo(400L);
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(3, 
100L, 30_000L))
+                .isEqualTo(800L);
+    }
+
+    @Test
+    void testCapIsEnforced() {
+        // 100ms * 2^10 = ~102s, capped to 30s.
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(10, 
100L, 30_000L))
+                .isEqualTo(30_000L);
+    }
+
+    @Test
+    void testLargeAttemptNumberDoesNotOverflow() {
+        // Java masks the shift amount mod 64, so without the clamp this would 
wrap and produce
+        // an incorrect delay. The output must still equal the cap.
+        long cap = 30_000L;
+        
assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(1000, 100L, 
cap))
+                .isEqualTo(cap);
+        assertThat(
+                        TritonInferenceModelFunction.computeBackoffDelayMillis(
+                                Integer.MAX_VALUE, 100L, cap))
+                .isEqualTo(cap);
+    }
+
+    @Test
+    void testBaseTimesMultiplierOverflowFallsBackToCap() {
+        // Long.MAX_VALUE * 2 would overflow; result must be the cap, not a 
negative number.
+        long base = Long.MAX_VALUE / 2;
+        long cap = 1_000L;
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(5, 
base, cap))
+                .isEqualTo(cap);
+    }
+
+    @Test
+    void testNegativeAttemptNumberIsTreatedAsZero() {
+        // Defensive clamp: a negative attempt counter (should be impossible 
in production but
+        // cheap to guard) must not trigger `1L << -1` (which equals `1L << 
63`).
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(-1, 
100L, 30_000L))
+                .isEqualTo(100L);
+    }
+
+    @Test
+    void testDelayIsMonotonic() {
+        long base = 50L;
+        long cap = 10_000L;
+        long previous = 0L;
+        for (int attempt = 0; attempt < 20; attempt++) {
+            long delay = 
TritonInferenceModelFunction.computeBackoffDelayMillis(attempt, base, cap);
+            assertThat(delay)
+                    .as("delay must be non-decreasing across attempts 
(attempt=%d)", attempt)
+                    .isGreaterThanOrEqualTo(previous);
+            assertThat(delay)
+                    .as("delay must never exceed cap (attempt=%d)", attempt)
+                    .isLessThanOrEqualTo(cap);
+            previous = delay;
+        }
+    }
+
+    @Test
+    void testZeroBaseReturnsZero() {
+        // Degenerate contract: option validation normally forbids baseMs == 
0, but the static
+        // helper must still be total. Any attempt with base=0 produces 0 (0 
<< n == 0), and we
+        // explicitly skip the overflow guard because `Long.MAX_VALUE / 0` 
would throw.
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(0, 
0L, 30_000L))
+                .isEqualTo(0L);
+        assertThat(TritonInferenceModelFunction.computeBackoffDelayMillis(5, 
0L, 30_000L))
+                .isEqualTo(0L);
+    }
+
+    @Test
+    void testCapEqualsBaseAlwaysReturnsBase() {
+        // When the user configures retry-initial-backoff == 
retry-max-backoff, the cap clamps
+        // every attempt to the base delay (effectively a fixed-delay retry 
schedule).
+        long fixed = 500L;
+        for (int attempt = 0; attempt < 10; attempt++) {
+            assertThat(
+                            
TritonInferenceModelFunction.computeBackoffDelayMillis(
+                                    attempt, fixed, fixed))
+                    .as("cap == base collapses to fixed delay (attempt=%d)", 
attempt)
+                    .isEqualTo(fixed);
+        }
+    }
+
+    @Test
+    void testJitterStaysWithinEqualJitterWindow() {
+        // Equal jitter: the returned value must land in [nominal/2, nominal] 
inclusive. A strict
+        // probabilistic bound check over many samples is a cheap, non-flaky 
way to verify the
+        // window without asserting the exact distribution.
+        long nominal = 1000L;
+        for (int i = 0; i < 10_000; i++) {
+            long jittered = 
TritonInferenceModelFunction.computeBackoffWithJitter(nominal);
+            assertThat(jittered)
+                    .as("jitter must not fall below nominal/2 (iteration=%d)", 
i)
+                    .isGreaterThanOrEqualTo(nominal / 2L);
+            assertThat(jittered)
+                    .as("jitter must not exceed nominal (iteration=%d)", i)
+                    .isLessThanOrEqualTo(nominal);
+        }
+    }
+
+    @Test
+    void testJitterProducesSpreadToAvoidThunderingHerd() {
+        // A deterministic computeBackoffWithJitter would defeat the whole 
point of jitter and
+        // cause a thundering herd. Sample a moderate number of draws and 
require at least a few
+        // distinct values — with a uniform integer range of ~500 values the 
probability of all
+        // 100 draws being identical is astronomically small (< 2^-800).
+        long nominal = 1000L;
+        java.util.Set<Long> distinct = new java.util.HashSet<>();
+        for (int i = 0; i < 100; i++) {
+            
distinct.add(TritonInferenceModelFunction.computeBackoffWithJitter(nominal));
+        }
+        assertThat(distinct)
+                .as("jitter must introduce variation across calls, got only 
%s", distinct)
+                .hasSizeGreaterThan(1);
+    }
+
+    @Test
+    void testJitterPassesThroughTrivialDelays() {
+        // 0 and 1 ms delays are too small for meaningful jitter and must pass 
through unchanged
+        // so that a degenerate config does not accidentally return a negative 
or truncated
+        // value.
+        
assertThat(TritonInferenceModelFunction.computeBackoffWithJitter(0L)).isEqualTo(0L);
+        
assertThat(TritonInferenceModelFunction.computeBackoffWithJitter(1L)).isEqualTo(1L);
+    }
+}
diff --git 
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryConfigValidationTest.java
 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryConfigValidationTest.java
new file mode 100644
index 00000000000..28b131c2bd1
--- /dev/null
+++ 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryConfigValidationTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.ModelProviderFactory;
+import org.apache.flink.table.factories.utils.FactoryMocks;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Validates that retry-config validation in {@link 
AbstractTritonModelFunction}'s constructor is
+ * gated on whether retries are actually enabled.
+ *
+ * <p>Background: previously the backoff-duration checks ran unconditionally, 
so a user who
+ * explicitly disabled retries with {@code max-retries=0} would still be 
rejected if their
+ * (otherwise unused) backoff settings were degenerate. Backoff durations only 
matter when {@code
+ * maxRetries > 0}, so validating them with retries disabled is wrong.
+ */
+class TritonRetryConfigValidationTest {
+
+    private static final ResolvedSchema INPUT_SCHEMA =
+            ResolvedSchema.of(Column.physical("input", DataTypes.STRING()));
+
+    private static final ResolvedSchema OUTPUT_SCHEMA =
+            ResolvedSchema.of(Column.physical("output", DataTypes.STRING()));
+
+    @Test
+    void testZeroRetriesSkipsBackoffValidation() {
+        // With retries disabled, retry-initial-backoff=0ms is meaningless but 
harmless: the
+        // backoff is never consulted. The constructor must accept it instead 
of failing the job
+        // submission for an unused setting.
+        Map<String, String> options = baseOptions();
+        options.put(TritonOptions.MAX_RETRIES.key(), "0");
+        options.put(TritonOptions.RETRY_INITIAL_BACKOFF.key(), "0 ms");
+
+        ModelProviderFactory.Context context =
+                FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA, 
options);
+
+        // Construction must succeed; previously this threw 
IllegalArgumentException.
+        TritonInferenceModelFunction function =
+                new TritonInferenceModelFunction(context, 
contextConfig(context));
+        assertThat(function).isNotNull();
+    }
+
+    @Test
+    void testZeroRetriesSkipsMaxBackoffValidation() {
+        // Same rationale as above for retry-max-backoff. Note we also set the 
initial backoff
+        // to a positive value to isolate the max-backoff check.
+        Map<String, String> options = baseOptions();
+        options.put(TritonOptions.MAX_RETRIES.key(), "0");
+        options.put(TritonOptions.RETRY_INITIAL_BACKOFF.key(), "100 ms");
+        options.put(TritonOptions.RETRY_MAX_BACKOFF.key(), "0 ms");
+
+        ModelProviderFactory.Context context =
+                FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA, 
options);
+
+        TritonInferenceModelFunction function =
+                new TritonInferenceModelFunction(context, 
contextConfig(context));
+        assertThat(function).isNotNull();
+    }
+
+    @Test
+    void testRetriesEnabledStillRejectsZeroBackoff() {
+        // Regression guard for the gating: as soon as retries are enabled, 
backoff validation
+        // must run. Otherwise a misconfigured initial-backoff would silently 
produce zero-delay
+        // retry storms in production.
+        Map<String, String> options = baseOptions();
+        options.put(TritonOptions.MAX_RETRIES.key(), "3");
+        options.put(TritonOptions.RETRY_INITIAL_BACKOFF.key(), "0 ms");
+
+        ModelProviderFactory.Context context =
+                FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA, 
options);
+
+        assertThatThrownBy(() -> new TritonInferenceModelFunction(context, 
contextConfig(context)))
+                .isInstanceOf(IllegalArgumentException.class)
+                
.hasMessageContaining(TritonOptions.RETRY_INITIAL_BACKOFF.key());
+    }
+
+    @Test
+    void testRetriesEnabledStillRejectsMaxBackoffSmallerThanInitial() {
+        Map<String, String> options = baseOptions();
+        options.put(TritonOptions.MAX_RETRIES.key(), "3");
+        options.put(TritonOptions.RETRY_INITIAL_BACKOFF.key(), "500 ms");
+        options.put(TritonOptions.RETRY_MAX_BACKOFF.key(), "100 ms");
+
+        ModelProviderFactory.Context context =
+                FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA, 
options);
+
+        assertThatThrownBy(() -> new TritonInferenceModelFunction(context, 
contextConfig(context)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(TritonOptions.RETRY_MAX_BACKOFF.key());
+    }
+
+    @Test
+    void testNegativeMaxRetriesAlwaysRejected() {
+        // The maxRetries >= 0 invariant guards the gate itself; without it, a 
negative value
+        // would skip backoff validation AND blow up downstream (e.g. 1L << -1 
in the backoff
+        // calculation). Verify it's still enforced.
+        Map<String, String> options = baseOptions();
+        options.put(TritonOptions.MAX_RETRIES.key(), "-1");
+
+        ModelProviderFactory.Context context =
+                FactoryMocks.createModelContext(INPUT_SCHEMA, OUTPUT_SCHEMA, 
options);
+
+        assertThatThrownBy(() -> new TritonInferenceModelFunction(context, 
contextConfig(context)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(TritonOptions.MAX_RETRIES.key());
+    }
+
+    private static Map<String, String> baseOptions() {
+        Map<String, String> options = new HashMap<>();
+        options.put("provider", TritonModelProviderFactory.IDENTIFIER);
+        options.put(TritonOptions.ENDPOINT.key(), "http://localhost:8000";);
+        options.put(TritonOptions.MODEL_NAME.key(), "test-model");
+        return options;
+    }
+
+    private static Configuration contextConfig(ModelProviderFactory.Context 
context) {
+        Configuration config = new Configuration();
+        context.getCatalogModel().getOptions().forEach(config::setString);
+        return config;
+    }
+}
diff --git 
a/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryOptionsTest.java
 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryOptionsTest.java
new file mode 100644
index 00000000000..e9f48616c85
--- /dev/null
+++ 
b/flink-models/flink-model-triton/src/test/java/org/apache/flink/model/triton/TritonRetryOptionsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that the retry / default-value {@link TritonOptions} expose the 
contracts users rely on:
+ * stable option keys (changing them silently breaks pipelines) and the 
documented default values.
+ */
+class TritonRetryOptionsTest {
+
+    @Test
+    void testMaxRetriesKeyAndDefault() {
+        assertThat(TritonOptions.MAX_RETRIES.key()).isEqualTo("max-retries");
+        assertThat(TritonOptions.MAX_RETRIES.defaultValue()).isZero();
+    }
+
+    @Test
+    void testRetryInitialBackoffKeyAndDefault() {
+        
assertThat(TritonOptions.RETRY_INITIAL_BACKOFF.key()).isEqualTo("retry-initial-backoff");
+        assertThat(TritonOptions.RETRY_INITIAL_BACKOFF.defaultValue())
+                .isEqualTo(Duration.ofMillis(100));
+    }
+
+    @Test
+    void testRetryMaxBackoffKeyAndDefault() {
+        
assertThat(TritonOptions.RETRY_MAX_BACKOFF.key()).isEqualTo("retry-max-backoff");
+        assertThat(TritonOptions.RETRY_MAX_BACKOFF.defaultValue())
+                .isEqualTo(Duration.ofSeconds(30));
+    }
+
+    @Test
+    void testDefaultValueKeyAndNoDefault() {
+        
assertThat(TritonOptions.DEFAULT_VALUE.key()).isEqualTo("default-value");
+        // When not configured, default-value must be null so the operator can 
distinguish
+        // "fallback enabled" from "no fallback configured".
+        assertThat(TritonOptions.DEFAULT_VALUE.defaultValue()).isNull();
+    }
+
+    @Test
+    void testRetryInitialBackoffDefaultIsLessThanOrEqualToMax() {
+        // Regression guard for a subtle misconfiguration: if the default of 
RETRY_INITIAL_BACKOFF
+        // ever exceeded RETRY_MAX_BACKOFF's default, every out-of-the-box job 
would fail
+        // validation in AbstractTritonModelFunction's constructor.
+        assertThat(TritonOptions.RETRY_INITIAL_BACKOFF.defaultValue())
+                
.isLessThanOrEqualTo(TritonOptions.RETRY_MAX_BACKOFF.defaultValue());
+    }
+}

Reply via email to