davidradl commented on code in PR #27567:
URL: https://github.com/apache/flink/pull/27567#discussion_r2788455349


##########
flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonCircuitBreaker.java:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.model.triton;
+
+import 
org.apache.flink.model.triton.exception.TritonCircuitBreakerOpenException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Circuit breaker implementation for Triton Inference Server health 
management.
+ *
+ * <p>This circuit breaker follows the classic three-state model to protect 
the system from
+ * cascading failures when the Triton server becomes unhealthy:
+ *
+ * <ul>
+ *   <li><b>CLOSED</b>: Normal operation. Requests are allowed. Tracks failure 
rate.
+ *   <li><b>OPEN</b>: Triton is unhealthy. All requests fail fast without 
hitting the server.
+ *   <li><b>HALF_OPEN</b>: Testing recovery. Limited requests allowed to probe 
server health.
+ * </ul>
+ *
+ * <p><b>State Transitions:</b>
+ *
+ * <pre>
+ *  CLOSED ──[failure rate > threshold]──> OPEN
+ *            ↑                              │
+ *            │                              │ [after timeout]
+ *            │                              ↓
+ *            └──[success count met]── HALF_OPEN
+ * </pre>
+ *
+ * <p><b>Benefits:</b>
+ *
+ * <ul>
+ *   <li>Fail fast when server is down, avoiding wasted retries
+ *   <li>Automatic recovery detection
+ *   <li>Reduced load on failing servers (prevents cascading failure)
+ *   <li>Improved system resilience
+ * </ul>
+ *
+ * <p><b>Thread Safety:</b> This class is thread-safe and designed for 
concurrent access from
+ * multiple Flink task threads.
+ *
+ * @see TritonHealthChecker
+ */
+public class TritonCircuitBreaker {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TritonCircuitBreaker.class);
+
+    /** Current state of the circuit breaker. */
+    public enum State {
+        /** Normal operation, requests allowed, tracking failures. */
+        CLOSED,
+        /** Server unhealthy, failing fast without hitting server. */
+        OPEN,
+        /** Testing recovery with limited requests. */
+        HALF_OPEN
+    }
+
+    private final String endpoint;
+    private final double failureThreshold;
+    private final Duration openStateDuration;
+    private final int halfOpenMaxRequests;
+
+    private final AtomicReference<State> state = new 
AtomicReference<>(State.CLOSED);
+    private final AtomicLong lastStateTransitionTime = new 
AtomicLong(System.currentTimeMillis());
+
+    // Metrics for CLOSED state
+    private final AtomicInteger totalRequests = new AtomicInteger(0);
+    private final AtomicInteger failedRequests = new AtomicInteger(0);
+
+    // Metrics for HALF_OPEN state
+    private final AtomicInteger halfOpenSuccesses = new AtomicInteger(0);
+    private final AtomicInteger halfOpenFailures = new AtomicInteger(0);
+    private final AtomicInteger halfOpenRequests = new AtomicInteger(0);
+
+    /**
+     * Minimum number of requests before evaluating failure rate. This 
prevents opening the circuit
+     * based on too few samples.
+     */
+    private static final int MIN_REQUESTS_THRESHOLD = 10;
+
+    /**
+     * Creates a new circuit breaker for a Triton endpoint.
+     *
+     * @param endpoint The Triton server endpoint URL
+     * @param failureThreshold Failure rate (0.0-1.0) that triggers circuit 
opening
+     * @param openStateDuration How long to stay OPEN before transitioning to 
HALF_OPEN
+     * @param halfOpenMaxRequests Number of successful test requests needed in 
HALF_OPEN to close
+     */
+    public TritonCircuitBreaker(
+            String endpoint,
+            double failureThreshold,
+            Duration openStateDuration,
+            int halfOpenMaxRequests) {
+        this.endpoint = endpoint;
+        this.failureThreshold = failureThreshold;
+        this.openStateDuration = openStateDuration;
+        this.halfOpenMaxRequests = halfOpenMaxRequests;
+
+        LOG.info(
+                "Circuit breaker created for endpoint {} with threshold={}, 
openDuration={}, halfOpenRequests={}",
+                endpoint,
+                failureThreshold,
+                openStateDuration,
+                halfOpenMaxRequests);
+    }
+
+    /**
+     * Checks if a request is allowed through the circuit breaker.
+     *
+     * @return true if request should proceed, false if should fail fast
+     * @throws TritonCircuitBreakerOpenException if circuit is OPEN
+     */
+    public boolean allowRequest() throws TritonCircuitBreakerOpenException {

Review Comment:
   nit maybe rename to `isRequestAllowed`. `allowRequest` implies that we are 
allowing the request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to