davidradl commented on code in PR #27567: URL: https://github.com/apache/flink/pull/27567#discussion_r2788455349
########## flink-models/flink-model-triton/src/main/java/org/apache/flink/model/triton/TritonCircuitBreaker.java: ########## @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.model.triton; + +import org.apache.flink.model.triton.exception.TritonCircuitBreakerOpenException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Circuit breaker implementation for Triton Inference Server health management. + * + * <p>This circuit breaker follows the classic three-state model to protect the system from + * cascading failures when the Triton server becomes unhealthy: + * + * <ul> + * <li><b>CLOSED</b>: Normal operation. Requests are allowed. Tracks failure rate. + * <li><b>OPEN</b>: Triton is unhealthy. All requests fail fast without hitting the server. + * <li><b>HALF_OPEN</b>: Testing recovery. Limited requests allowed to probe server health. + * </ul> + * + * <p><b>State Transitions:</b> + * + * <pre> + * CLOSED ──[failure rate > threshold]──> OPEN + * ↑ │ + * │ │ [after timeout] + * │ ↓ + * └──[success count met]── HALF_OPEN + * </pre> + * + * <p><b>Benefits:</b> + * + * <ul> + * <li>Fail fast when server is down, avoiding wasted retries + * <li>Automatic recovery detection + * <li>Reduced load on failing servers (prevents cascading failure) + * <li>Improved system resilience + * </ul> + * + * <p><b>Thread Safety:</b> This class is thread-safe and designed for concurrent access from + * multiple Flink task threads. + * + * @see TritonHealthChecker + */ +public class TritonCircuitBreaker { + private static final Logger LOG = LoggerFactory.getLogger(TritonCircuitBreaker.class); + + /** Current state of the circuit breaker. */ + public enum State { + /** Normal operation, requests allowed, tracking failures. */ + CLOSED, + /** Server unhealthy, failing fast without hitting server. */ + OPEN, + /** Testing recovery with limited requests. */ + HALF_OPEN + } + + private final String endpoint; + private final double failureThreshold; + private final Duration openStateDuration; + private final int halfOpenMaxRequests; + + private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED); + private final AtomicLong lastStateTransitionTime = new AtomicLong(System.currentTimeMillis()); + + // Metrics for CLOSED state + private final AtomicInteger totalRequests = new AtomicInteger(0); + private final AtomicInteger failedRequests = new AtomicInteger(0); + + // Metrics for HALF_OPEN state + private final AtomicInteger halfOpenSuccesses = new AtomicInteger(0); + private final AtomicInteger halfOpenFailures = new AtomicInteger(0); + private final AtomicInteger halfOpenRequests = new AtomicInteger(0); + + /** + * Minimum number of requests before evaluating failure rate. This prevents opening the circuit + * based on too few samples. + */ + private static final int MIN_REQUESTS_THRESHOLD = 10; + + /** + * Creates a new circuit breaker for a Triton endpoint. + * + * @param endpoint The Triton server endpoint URL + * @param failureThreshold Failure rate (0.0-1.0) that triggers circuit opening + * @param openStateDuration How long to stay OPEN before transitioning to HALF_OPEN + * @param halfOpenMaxRequests Number of successful test requests needed in HALF_OPEN to close + */ + public TritonCircuitBreaker( + String endpoint, + double failureThreshold, + Duration openStateDuration, + int halfOpenMaxRequests) { + this.endpoint = endpoint; + this.failureThreshold = failureThreshold; + this.openStateDuration = openStateDuration; + this.halfOpenMaxRequests = halfOpenMaxRequests; + + LOG.info( + "Circuit breaker created for endpoint {} with threshold={}, openDuration={}, halfOpenRequests={}", + endpoint, + failureThreshold, + openStateDuration, + halfOpenMaxRequests); + } + + /** + * Checks if a request is allowed through the circuit breaker. + * + * @return true if request should proceed, false if should fail fast + * @throws TritonCircuitBreakerOpenException if circuit is OPEN + */ + public boolean allowRequest() throws TritonCircuitBreakerOpenException { Review Comment: nit maybe rename to `isRequestAllowed`. `allowRequest` implies that we are allowing the request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
