parveensania commented on code in PR #37840:
URL: https://github.com/apache/beam/pull/37840#discussion_r3018146411


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -82,6 +85,74 @@ private static final class FailoverState {
     // Time when primary first became not-ready. -1 when primary is currently 
READY.
     @GuardedBy("this")
     long primaryNotReadySinceNanos = -1;
+
+    private final int channelId;
+
+    FailoverState(int channelId) {
+      this.channelId = channelId;
+    }
+
+    /**
+     * Determines whether the next RPC should route to the fallback channel, 
updating internal state
+     * as needed.
+     */
+    synchronized boolean computeUseFallback(long nowNanos) {
+      // Clear RPC-based fallback if the cooling period has elapsed.
+      if (useFallbackDueToRPC
+          && nowNanos - lastRPCFallbackTimeNanos >= 
FALLBACK_COOLING_PERIOD_NANOS) {
+        useFallbackDueToRPC = false;
+        LOG.info(
+            "[channel-{}] Primary channel cooling period elapsed; switching 
back from fallback.",
+            channelId);
+      }
+      // Check if primary has been not-ready long enough to switch to fallback.
+      // primaryNotReadySinceNanos is set by the state-change callback when 
primary is not ready.
+      if (!useFallbackDueToRPC
+          && !useFallbackDueToState
+          && primaryNotReadySinceNanos >= 0
+          && nowNanos - primaryNotReadySinceNanos > 
PRIMARY_NOT_READY_WAIT_NANOS) {
+        useFallbackDueToState = true;
+        LOG.warn(
+            "[channel-{}] Primary connection unavailable. Switching to 
secondary connection.",
+            channelId);
+      }
+      return useFallbackDueToRPC || useFallbackDueToState;
+    }
+
+    /**
+     * Starts the not-ready grace period timer. Called by the state-change 
callback when primary
+     * transitions to a non-ready state. Has no effect if already tracking or 
already on fallback.
+     */
+    synchronized void markPrimaryNotReady(long nowNanos) {
+      if (!useFallbackDueToRPC && !useFallbackDueToState && 
primaryNotReadySinceNanos < 0) {
+        primaryNotReadySinceNanos = nowNanos;
+      }
+    }
+
+    /**
+     * Transitions the fallback state. When toFallback is true (RPC failure) 
it enables RPC-based
+     * fallback if not already active and returns true so the caller can log 
the failure details.
+     * When toFallback is false (primary recovered) it clears all fallback 
flags and returns true if
+     * recovery actually changed state, so the caller can log it.
+     */
+    synchronized boolean transitionFallback(boolean toFallback, long nowNanos) 
{

Review Comment:
   Split the method into 2 as per the suggestion. Also, added a 30sec wait time 
before we fallback for RPC based failover. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to