scwhittle commented on code in PR #37840:
URL: https://github.com/apache/beam/pull/37840#discussion_r2960282431


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPC fails with transient 
errors ({@link
+ *       Status.Code#UNAVAILABLE}, {@link Status.Code#DEADLINE_EXCEEDED}, or 
{@link
+ *       Status.Code#UNKNOWN}), switches to fallback channel and waits for a 
1-hour cooling period

Review Comment:
   unless channel goes through unhealthy->healthy connectivity transition?
   Want to make sure some race where we observe an rpc failure before we 
observe the connectivity failure doesn't cause us to stop using the primary 
channel if it reestablishes quickly.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPC fails with transient 
errors ({@link
+ *       Status.Code#UNAVAILABLE}, {@link Status.Code#DEADLINE_EXCEEDED}, or 
{@link
+ *       Status.Code#UNKNOWN}), switches to fallback channel and waits for a 
1-hour cooling period
+ *       before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  private final ManagedChannel primary;
+  @Nullable private final ManagedChannel fallback;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  // Set when primary's connection state has been unavailable for too long.
+  private final AtomicBoolean useFallbackDueToState = new AtomicBoolean(false);
+  // Set when an RPC on primary fails with a transient error.
+  private final AtomicBoolean useFallbackDueToRPC = new AtomicBoolean(false);
+  private final AtomicLong lastRPCFallbackTimeNanos = new AtomicLong(0);
+  private final AtomicLong primaryNotReadySinceNanos = new AtomicLong(-1);
+  private final LongSupplier nanoClock;
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      @Nullable ManagedChannel fallback,
+      @Nullable CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    this.primary = primary;
+    this.fallback = fallback;
+    this.fallbackCallCredentials = fallbackCallCredentials;
+    this.nanoClock = nanoClock;
+    // Register callback to monitor primary channel state changes
+    registerPrimaryStateChangeListener();
+  }
+
+  // Test-only.
+  public static FailoverChannel create(ManagedChannel primary, ManagedChannel 
fallback) {
+    return new FailoverChannel(primary, fallback, null, System::nanoTime);
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary, ManagedChannel fallback, CallCredentials 
fallbackCallCredentials) {
+    return new FailoverChannel(primary, fallback, fallbackCallCredentials, 
System::nanoTime);
+  }
+
+  static FailoverChannel forTest(
+      ManagedChannel primary,
+      ManagedChannel fallback,
+      CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    return new FailoverChannel(primary, fallback, fallbackCallCredentials, 
nanoClock);
+  }
+
+  @Override
+  public String authority() {
+    return primary.authority();
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
+      MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) 
{
+    // Check if the RPC-based cooling period has elapsed.
+    if (useFallbackDueToRPC.get()) {
+      long timeSinceLastFallback = nanoClock.getAsLong() - 
lastRPCFallbackTimeNanos.get();
+      if (timeSinceLastFallback >= FALLBACK_COOLING_PERIOD_NANOS) {
+        if (useFallbackDueToRPC.compareAndSet(true, false)) {
+          LOG.info("Primary channel cooling period elapsed; switching back 
from fallback.");
+        }
+      }
+    }
+
+    if (fallback != null && (useFallbackDueToRPC.get() || 
useFallbackDueToState.get())) {
+      return new FailoverClientCall<>(
+          fallback.newCall(methodDescriptor, 
applyFallbackCredentials(callOptions)),
+          true,
+          methodDescriptor.getFullMethodName());
+    }
+
+    // If primary has not become ready for a sustained period, fail over to 
fallback.
+    if (fallback != null && shouldFallBackDueToPrimaryState()) {
+      if (useFallbackDueToState.compareAndSet(false, true)) {
+        LOG.warn("Primary connection unavailable. Switching to secondary 
connection.");
+      }
+      return new FailoverClientCall<>(
+          fallback.newCall(methodDescriptor, 
applyFallbackCredentials(callOptions)),
+          true,
+          methodDescriptor.getFullMethodName());
+    }
+
+    return new FailoverClientCall<>(
+        primary.newCall(methodDescriptor, callOptions),
+        false,
+        methodDescriptor.getFullMethodName());
+  }
+
+  @Override
+  public ManagedChannel shutdown() {
+    primary.shutdown();
+    if (fallback != null) {
+      fallback.shutdown();
+    }
+    return this;
+  }
+
+  @Override
+  public ManagedChannel shutdownNow() {
+    primary.shutdownNow();
+    if (fallback != null) {
+      fallback.shutdownNow();
+    }
+    return this;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return primary.isShutdown() && (fallback == null || fallback.isShutdown());
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return primary.isTerminated() && (fallback == null || 
fallback.isTerminated());
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    long endTimeNanos = nanoClock.getAsLong() + unit.toNanos(timeout);
+    boolean primaryTerminated = primary.awaitTermination(timeout, unit);
+    if (fallback != null) {
+      long remainingNanos = Math.max(0, endTimeNanos - nanoClock.getAsLong());
+      return primaryTerminated && fallback.awaitTermination(remainingNanos, 
TimeUnit.NANOSECONDS);
+    }
+    return primaryTerminated;
+  }
+
+  private boolean shouldFallbackBasedOnRPCStatus(Status status) {
+    switch (status.getCode()) {
+      case UNAVAILABLE:
+      case DEADLINE_EXCEEDED:
+      case UNKNOWN:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  private boolean hasFallbackChannel() {
+    return fallback != null;
+  }
+
+  private CallOptions applyFallbackCredentials(CallOptions callOptions) {
+    if (fallbackCallCredentials != null && callOptions.getCredentials() == 
null) {
+      return callOptions.withCallCredentials(fallbackCallCredentials);
+    }
+    return callOptions;
+  }
+
+  private boolean shouldFallBackDueToPrimaryState() {
+    ConnectivityState connectivityState = primary.getState(true);

Review Comment:
   passing true sounds like it might block attempting to connect if in the idle 
state.  How about passing false and treating IDLE as not something that needs 
to be falled back from.
   
   Or could we just remove this if we are anyway setting up a change listener 
to observe it's changes?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPC fails with transient 
errors ({@link
+ *       Status.Code#UNAVAILABLE}, {@link Status.Code#DEADLINE_EXCEEDED}, or 
{@link
+ *       Status.Code#UNKNOWN}), switches to fallback channel and waits for a 
1-hour cooling period
+ *       before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  private final ManagedChannel primary;
+  @Nullable private final ManagedChannel fallback;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  // Set when primary's connection state has been unavailable for too long.
+  private final AtomicBoolean useFallbackDueToState = new AtomicBoolean(false);
+  // Set when an RPC on primary fails with a transient error.
+  private final AtomicBoolean useFallbackDueToRPC = new AtomicBoolean(false);
+  private final AtomicLong lastRPCFallbackTimeNanos = new AtomicLong(0);
+  private final AtomicLong primaryNotReadySinceNanos = new AtomicLong(-1);
+  private final LongSupplier nanoClock;
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      @Nullable ManagedChannel fallback,
+      @Nullable CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    this.primary = primary;
+    this.fallback = fallback;
+    this.fallbackCallCredentials = fallbackCallCredentials;
+    this.nanoClock = nanoClock;
+    // Register callback to monitor primary channel state changes
+    registerPrimaryStateChangeListener();
+  }
+
+  // Test-only.
+  public static FailoverChannel create(ManagedChannel primary, ManagedChannel 
fallback) {
+    return new FailoverChannel(primary, fallback, null, System::nanoTime);
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary, ManagedChannel fallback, CallCredentials 
fallbackCallCredentials) {
+    return new FailoverChannel(primary, fallback, fallbackCallCredentials, 
System::nanoTime);
+  }
+
+  static FailoverChannel forTest(
+      ManagedChannel primary,
+      ManagedChannel fallback,
+      CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    return new FailoverChannel(primary, fallback, fallbackCallCredentials, 
nanoClock);
+  }
+
+  @Override
+  public String authority() {
+    return primary.authority();
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
+      MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) 
{
+    // Check if the RPC-based cooling period has elapsed.
+    if (useFallbackDueToRPC.get()) {
+      long timeSinceLastFallback = nanoClock.getAsLong() - 
lastRPCFallbackTimeNanos.get();
+      if (timeSinceLastFallback >= FALLBACK_COOLING_PERIOD_NANOS) {
+        if (useFallbackDueToRPC.compareAndSet(true, false)) {
+          LOG.info("Primary channel cooling period elapsed; switching back 
from fallback.");
+        }
+      }
+    }
+
+    if (fallback != null && (useFallbackDueToRPC.get() || 
useFallbackDueToState.get())) {
+      return new FailoverClientCall<>(
+          fallback.newCall(methodDescriptor, 
applyFallbackCredentials(callOptions)),
+          true,
+          methodDescriptor.getFullMethodName());
+    }
+
+    // If primary has not become ready for a sustained period, fail over to 
fallback.
+    if (fallback != null && shouldFallBackDueToPrimaryState()) {
+      if (useFallbackDueToState.compareAndSet(false, true)) {
+        LOG.warn("Primary connection unavailable. Switching to secondary 
connection.");
+      }
+      return new FailoverClientCall<>(
+          fallback.newCall(methodDescriptor, 
applyFallbackCredentials(callOptions)),
+          true,
+          methodDescriptor.getFullMethodName());
+    }
+
+    return new FailoverClientCall<>(
+        primary.newCall(methodDescriptor, callOptions),
+        false,
+        methodDescriptor.getFullMethodName());
+  }
+
+  @Override
+  public ManagedChannel shutdown() {
+    primary.shutdown();
+    if (fallback != null) {
+      fallback.shutdown();
+    }
+    return this;
+  }
+
+  @Override
+  public ManagedChannel shutdownNow() {
+    primary.shutdownNow();
+    if (fallback != null) {
+      fallback.shutdownNow();
+    }
+    return this;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return primary.isShutdown() && (fallback == null || fallback.isShutdown());
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return primary.isTerminated() && (fallback == null || 
fallback.isTerminated());
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    long endTimeNanos = nanoClock.getAsLong() + unit.toNanos(timeout);
+    boolean primaryTerminated = primary.awaitTermination(timeout, unit);
+    if (fallback != null) {
+      long remainingNanos = Math.max(0, endTimeNanos - nanoClock.getAsLong());
+      return primaryTerminated && fallback.awaitTermination(remainingNanos, 
TimeUnit.NANOSECONDS);
+    }
+    return primaryTerminated;
+  }
+
+  private boolean shouldFallbackBasedOnRPCStatus(Status status) {
+    switch (status.getCode()) {
+      case UNAVAILABLE:
+      case DEADLINE_EXCEEDED:

Review Comment:
   I'm worried that DEADLINE_EXCEEDED might occur for other reasons too.
   
   One idea might be to see if the call had any responses, in that case we know 
that it was at some point connected to the backend and we could choose not to 
fallback.
   We could also perhaps wait for several continuously failed rpcs or failing 
rpcs for some elapsed time period before falling back.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPC fails with transient 
errors ({@link
+ *       Status.Code#UNAVAILABLE}, {@link Status.Code#DEADLINE_EXCEEDED}, or 
{@link
+ *       Status.Code#UNKNOWN}), switches to fallback channel and waits for a 
1-hour cooling period
+ *       before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  private final ManagedChannel primary;
+  @Nullable private final ManagedChannel fallback;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  // Set when primary's connection state has been unavailable for too long.
+  private final AtomicBoolean useFallbackDueToState = new AtomicBoolean(false);
+  // Set when an RPC on primary fails with a transient error.
+  private final AtomicBoolean useFallbackDueToRPC = new AtomicBoolean(false);
+  private final AtomicLong lastRPCFallbackTimeNanos = new AtomicLong(0);
+  private final AtomicLong primaryNotReadySinceNanos = new AtomicLong(-1);
+  private final LongSupplier nanoClock;
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      @Nullable ManagedChannel fallback,
+      @Nullable CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    this.primary = primary;
+    this.fallback = fallback;
+    this.fallbackCallCredentials = fallbackCallCredentials;
+    this.nanoClock = nanoClock;
+    // Register callback to monitor primary channel state changes
+    registerPrimaryStateChangeListener();
+  }
+
+  // Test-only.

Review Comment:
   how about removing this one then? The test can have a helper in itself that 
calls forTest below with default creds and time supplier



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPC fails with transient 
errors ({@link
+ *       Status.Code#UNAVAILABLE}, {@link Status.Code#DEADLINE_EXCEEDED}, or 
{@link
+ *       Status.Code#UNKNOWN}), switches to fallback channel and waits for a 
1-hour cooling period
+ *       before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  private final ManagedChannel primary;
+  @Nullable private final ManagedChannel fallback;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  // Set when primary's connection state has been unavailable for too long.
+  private final AtomicBoolean useFallbackDueToState = new AtomicBoolean(false);
+  // Set when an RPC on primary fails with a transient error.
+  private final AtomicBoolean useFallbackDueToRPC = new AtomicBoolean(false);
+  private final AtomicLong lastRPCFallbackTimeNanos = new AtomicLong(0);
+  private final AtomicLong primaryNotReadySinceNanos = new AtomicLong(-1);
+  private final LongSupplier nanoClock;
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      @Nullable ManagedChannel fallback,

Review Comment:
   can we just not support null here? It seems the caller could just use 
primary without creating a FailoverChannel if they don't want to support 
fallback, and then we don't have to complicate the code with it possibly being 
null.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPC fails with transient 
errors ({@link
+ *       Status.Code#UNAVAILABLE}, {@link Status.Code#DEADLINE_EXCEEDED}, or 
{@link
+ *       Status.Code#UNKNOWN}), switches to fallback channel and waits for a 
1-hour cooling period
+ *       before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  private final ManagedChannel primary;
+  @Nullable private final ManagedChannel fallback;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  // Set when primary's connection state has been unavailable for too long.
+  private final AtomicBoolean useFallbackDueToState = new AtomicBoolean(false);
+  // Set when an RPC on primary fails with a transient error.
+  private final AtomicBoolean useFallbackDueToRPC = new AtomicBoolean(false);
+  private final AtomicLong lastRPCFallbackTimeNanos = new AtomicLong(0);
+  private final AtomicLong primaryNotReadySinceNanos = new AtomicLong(-1);
+  private final LongSupplier nanoClock;
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);

Review Comment:
   can we move all the Atomics into a State object that we synchronize?  we 
have long-lived calls so I don't think we have to worry about the performance 
of synchronized block versus atomic in the call creation path as long as we are 
not doing any blocking stuff within it.
   I think it will help keep the code simpler and we don't have to worry about 
possible weird states races could put us in.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -804,20 +809,37 @@ private static void 
validateWorkerOptions(DataflowWorkerHarnessOptions options)
   }
 
   private static ChannelCache createChannelCache(
-      DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher 
configFetcher) {
+      DataflowWorkerHarnessOptions workerOptions,
+      ComputationConfig.Fetcher configFetcher,
+      GrpcDispatcherClient dispatcherClient) {
     ChannelCache channelCache =
         ChannelCache.create(
             (currentFlowControlSettings, serviceAddress) -> {
-              // IsolationChannel will create and manage separate RPC channels 
to the same
-              // serviceAddress.
-              return IsolationChannel.create(
-                  () ->
-                      remoteChannel(
-                          serviceAddress,
-                          
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
-                          currentFlowControlSettings),
-                  currentFlowControlSettings.getOnReadyThresholdBytes());
+              ManagedChannel primaryChannel =
+                  IsolationChannel.create(

Review Comment:
   since it's being setup this way IsolationChannel connectivity callbacks are 
going to be what is used. I'm not sure how that will work since it internally 
has multiple channels. Looking it seems just has the default ManagedChannel 
implementation which throws unimplemented exception.
   
   What about having IsolationChannel on top of fallback channels? That seems 
simpler to me since IsolationChannel just internally creates the separate 
channels and otherwise doesn't do much than forward things on.
   
   It would be good to have a unit test of whatever setup we do use so that we 
flush out the issues there instead of requiring an integration test.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPC fails with transient 
errors ({@link
+ *       Status.Code#UNAVAILABLE}, {@link Status.Code#DEADLINE_EXCEEDED}, or 
{@link
+ *       Status.Code#UNKNOWN}), switches to fallback channel and waits for a 
1-hour cooling period
+ *       before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  private final ManagedChannel primary;
+  @Nullable private final ManagedChannel fallback;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  // Set when primary's connection state has been unavailable for too long.
+  private final AtomicBoolean useFallbackDueToState = new AtomicBoolean(false);
+  // Set when an RPC on primary fails with a transient error.
+  private final AtomicBoolean useFallbackDueToRPC = new AtomicBoolean(false);
+  private final AtomicLong lastRPCFallbackTimeNanos = new AtomicLong(0);
+  private final AtomicLong primaryNotReadySinceNanos = new AtomicLong(-1);
+  private final LongSupplier nanoClock;
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      @Nullable ManagedChannel fallback,
+      @Nullable CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    this.primary = primary;
+    this.fallback = fallback;
+    this.fallbackCallCredentials = fallbackCallCredentials;
+    this.nanoClock = nanoClock;
+    // Register callback to monitor primary channel state changes
+    registerPrimaryStateChangeListener();
+  }
+
+  // Test-only.
+  public static FailoverChannel create(ManagedChannel primary, ManagedChannel 
fallback) {
+    return new FailoverChannel(primary, fallback, null, System::nanoTime);
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary, ManagedChannel fallback, CallCredentials 
fallbackCallCredentials) {
+    return new FailoverChannel(primary, fallback, fallbackCallCredentials, 
System::nanoTime);
+  }
+
+  static FailoverChannel forTest(
+      ManagedChannel primary,
+      ManagedChannel fallback,
+      CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    return new FailoverChannel(primary, fallback, fallbackCallCredentials, 
nanoClock);
+  }
+
+  @Override
+  public String authority() {
+    return primary.authority();
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
+      MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) 
{
+    // Check if the RPC-based cooling period has elapsed.
+    if (useFallbackDueToRPC.get()) {
+      long timeSinceLastFallback = nanoClock.getAsLong() - 
lastRPCFallbackTimeNanos.get();
+      if (timeSinceLastFallback >= FALLBACK_COOLING_PERIOD_NANOS) {
+        if (useFallbackDueToRPC.compareAndSet(true, false)) {
+          LOG.info("Primary channel cooling period elapsed; switching back 
from fallback.");
+        }
+      }
+    }
+
+    if (fallback != null && (useFallbackDueToRPC.get() || 
useFallbackDueToState.get())) {
+      return new FailoverClientCall<>(
+          fallback.newCall(methodDescriptor, 
applyFallbackCredentials(callOptions)),
+          true,
+          methodDescriptor.getFullMethodName());
+    }
+
+    // If primary has not become ready for a sustained period, fail over to 
fallback.
+    if (fallback != null && shouldFallBackDueToPrimaryState()) {
+      if (useFallbackDueToState.compareAndSet(false, true)) {
+        LOG.warn("Primary connection unavailable. Switching to secondary 
connection.");
+      }
+      return new FailoverClientCall<>(
+          fallback.newCall(methodDescriptor, 
applyFallbackCredentials(callOptions)),
+          true,
+          methodDescriptor.getFullMethodName());
+    }
+
+    return new FailoverClientCall<>(
+        primary.newCall(methodDescriptor, callOptions),
+        false,
+        methodDescriptor.getFullMethodName());
+  }
+
+  @Override
+  public ManagedChannel shutdown() {
+    primary.shutdown();
+    if (fallback != null) {
+      fallback.shutdown();
+    }
+    return this;
+  }
+
+  @Override
+  public ManagedChannel shutdownNow() {
+    primary.shutdownNow();
+    if (fallback != null) {
+      fallback.shutdownNow();
+    }
+    return this;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return primary.isShutdown() && (fallback == null || fallback.isShutdown());
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return primary.isTerminated() && (fallback == null || 
fallback.isTerminated());
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    long endTimeNanos = nanoClock.getAsLong() + unit.toNanos(timeout);
+    boolean primaryTerminated = primary.awaitTermination(timeout, unit);
+    if (fallback != null) {
+      long remainingNanos = Math.max(0, endTimeNanos - nanoClock.getAsLong());
+      return primaryTerminated && fallback.awaitTermination(remainingNanos, 
TimeUnit.NANOSECONDS);
+    }
+    return primaryTerminated;
+  }
+
+  private boolean shouldFallbackBasedOnRPCStatus(Status status) {
+    switch (status.getCode()) {
+      case UNAVAILABLE:
+      case DEADLINE_EXCEEDED:
+      case UNKNOWN:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  private boolean hasFallbackChannel() {
+    return fallback != null;
+  }
+
+  private CallOptions applyFallbackCredentials(CallOptions callOptions) {
+    if (fallbackCallCredentials != null && callOptions.getCredentials() == 
null) {
+      return callOptions.withCallCredentials(fallbackCallCredentials);
+    }
+    return callOptions;
+  }
+
+  private boolean shouldFallBackDueToPrimaryState() {
+    ConnectivityState connectivityState = primary.getState(true);
+    if (connectivityState == ConnectivityState.READY) {
+      primaryNotReadySinceNanos.set(-1);
+      return false;
+    }
+    long currentTimeNanos = nanoClock.getAsLong();
+    if (primaryNotReadySinceNanos.get() < 0) {
+      primaryNotReadySinceNanos.set(currentTimeNanos);
+    }
+    return currentTimeNanos - primaryNotReadySinceNanos.get() > 
PRIMARY_NOT_READY_WAIT_NANOS;
+  }
+
+  private void notifyFailure(Status status, boolean isFallback, String 
methodName) {
+    if (!status.isOk()
+        && !isFallback
+        && hasFallbackChannel()
+        && shouldFallbackBasedOnRPCStatus(status)) {
+      if (useFallbackDueToRPC.compareAndSet(false, true)) {
+        lastRPCFallbackTimeNanos.set(nanoClock.getAsLong());
+        LOG.warn(
+            "Primary connection failed for method: {}. Switching to secondary 
connection. Status: {}",
+            methodName,
+            status.getCode());
+      }
+    } else if (isFallback && !status.isOk()) {
+      LOG.warn(
+          "Secondary connection failed for method: {}. Status: {}", 
methodName, status.getCode());
+    }
+  }
+
+  private final class FailoverClientCall<ReqT, RespT>
+      extends SimpleForwardingClientCall<ReqT, RespT> {
+    private final boolean isFallback;
+    private final String methodName;
+
+    /**
+     * @param delegate the underlying ClientCall (either primary or fallback)
+     * @param isFallback true if {@code delegate} is a fallback channel call, 
false if it is a
+     *     primary channel call. This flag is inspected by {@link 
#notifyFailure} to determine
+     *     whether a failure should trigger switching to the fallback channel 
(only primary failures
+     *     do).
+     * @param methodName full gRPC method name (for logging)
+     */
+    FailoverClientCall(ClientCall<ReqT, RespT> delegate, boolean isFallback, 
String methodName) {
+      super(delegate);
+      this.isFallback = isFallback;
+      this.methodName = methodName;
+    }
+
+    @Override
+    public void start(Listener<RespT> responseListener, Metadata headers) {
+      super.start(
+          new SimpleForwardingClientCallListener<RespT>(responseListener) {
+            @Override
+            public void onClose(Status status, Metadata trailers) {

Review Comment:
   here is where I was wondering could we hook into onMessage or onHeaders to 
determine that the call did make some progress before possibly failing due to 
deadline or unavailable (which coudl possibly be from the backend status)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/FailoverChannel.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallCredentials;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ConnectivityState;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
+import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Metadata;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.MethodDescriptor;
+import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link ManagedChannel} that wraps a primary and a fallback channel.
+ *
+ * <p>Routes requests to either primary or fallback channel based on two 
independent failover modes:
+ *
+ * <ul>
+ *   <li><b>Connection Status Failover:</b> If the primary channel is not 
ready for 10+ seconds
+ *       (e.g., during network issues), routes to fallback channel. Switches 
back as soon as the
+ *       primary channel becomes READY again.
+ *   <li><b>RPC Failover:</b> If primary channel RPC fails with transient 
errors ({@link
+ *       Status.Code#UNAVAILABLE}, {@link Status.Code#DEADLINE_EXCEEDED}, or 
{@link
+ *       Status.Code#UNKNOWN}), switches to fallback channel and waits for a 
1-hour cooling period
+ *       before retrying primary.
+ * </ul>
+ */
+@Internal
+public final class FailoverChannel extends ManagedChannel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannel.class);
+  // Time to wait before retrying the primary channel after an RPC-based 
fallback.
+  private static final long FALLBACK_COOLING_PERIOD_NANOS = 
TimeUnit.HOURS.toNanos(1);
+  private static final long PRIMARY_NOT_READY_WAIT_NANOS = 
TimeUnit.SECONDS.toNanos(10);
+  private final ManagedChannel primary;
+  @Nullable private final ManagedChannel fallback;
+  @Nullable private final CallCredentials fallbackCallCredentials;
+  // Set when primary's connection state has been unavailable for too long.
+  private final AtomicBoolean useFallbackDueToState = new AtomicBoolean(false);
+  // Set when an RPC on primary fails with a transient error.
+  private final AtomicBoolean useFallbackDueToRPC = new AtomicBoolean(false);
+  private final AtomicLong lastRPCFallbackTimeNanos = new AtomicLong(0);
+  private final AtomicLong primaryNotReadySinceNanos = new AtomicLong(-1);
+  private final LongSupplier nanoClock;
+  private final AtomicBoolean stateChangeListenerRegistered = new 
AtomicBoolean(false);
+
+  private FailoverChannel(
+      ManagedChannel primary,
+      @Nullable ManagedChannel fallback,
+      @Nullable CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    this.primary = primary;
+    this.fallback = fallback;
+    this.fallbackCallCredentials = fallbackCallCredentials;
+    this.nanoClock = nanoClock;
+    // Register callback to monitor primary channel state changes
+    registerPrimaryStateChangeListener();
+  }
+
+  // Test-only.
+  public static FailoverChannel create(ManagedChannel primary, ManagedChannel 
fallback) {
+    return new FailoverChannel(primary, fallback, null, System::nanoTime);
+  }
+
+  public static FailoverChannel create(
+      ManagedChannel primary, ManagedChannel fallback, CallCredentials 
fallbackCallCredentials) {
+    return new FailoverChannel(primary, fallback, fallbackCallCredentials, 
System::nanoTime);
+  }
+
+  static FailoverChannel forTest(
+      ManagedChannel primary,
+      ManagedChannel fallback,
+      CallCredentials fallbackCallCredentials,
+      LongSupplier nanoClock) {
+    return new FailoverChannel(primary, fallback, fallbackCallCredentials, 
nanoClock);
+  }
+
+  @Override
+  public String authority() {
+    return primary.authority();
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
+      MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) 
{
+    // Check if the RPC-based cooling period has elapsed.
+    if (useFallbackDueToRPC.get()) {
+      long timeSinceLastFallback = nanoClock.getAsLong() - 
lastRPCFallbackTimeNanos.get();
+      if (timeSinceLastFallback >= FALLBACK_COOLING_PERIOD_NANOS) {
+        if (useFallbackDueToRPC.compareAndSet(true, false)) {
+          LOG.info("Primary channel cooling period elapsed; switching back 
from fallback.");
+        }
+      }
+    }
+
+    if (fallback != null && (useFallbackDueToRPC.get() || 
useFallbackDueToState.get())) {
+      return new FailoverClientCall<>(
+          fallback.newCall(methodDescriptor, 
applyFallbackCredentials(callOptions)),
+          true,
+          methodDescriptor.getFullMethodName());
+    }
+
+    // If primary has not become ready for a sustained period, fail over to 
fallback.
+    if (fallback != null && shouldFallBackDueToPrimaryState()) {
+      if (useFallbackDueToState.compareAndSet(false, true)) {
+        LOG.warn("Primary connection unavailable. Switching to secondary 
connection.");
+      }
+      return new FailoverClientCall<>(
+          fallback.newCall(methodDescriptor, 
applyFallbackCredentials(callOptions)),
+          true,
+          methodDescriptor.getFullMethodName());
+    }
+
+    return new FailoverClientCall<>(
+        primary.newCall(methodDescriptor, callOptions),
+        false,
+        methodDescriptor.getFullMethodName());
+  }
+
+  @Override
+  public ManagedChannel shutdown() {
+    primary.shutdown();
+    if (fallback != null) {
+      fallback.shutdown();
+    }
+    return this;
+  }
+
+  @Override
+  public ManagedChannel shutdownNow() {
+    primary.shutdownNow();
+    if (fallback != null) {
+      fallback.shutdownNow();
+    }
+    return this;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return primary.isShutdown() && (fallback == null || fallback.isShutdown());
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return primary.isTerminated() && (fallback == null || 
fallback.isTerminated());
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    long endTimeNanos = nanoClock.getAsLong() + unit.toNanos(timeout);
+    boolean primaryTerminated = primary.awaitTermination(timeout, unit);
+    if (fallback != null) {
+      long remainingNanos = Math.max(0, endTimeNanos - nanoClock.getAsLong());
+      return primaryTerminated && fallback.awaitTermination(remainingNanos, 
TimeUnit.NANOSECONDS);
+    }
+    return primaryTerminated;
+  }
+
+  private boolean shouldFallbackBasedOnRPCStatus(Status status) {
+    switch (status.getCode()) {
+      case UNAVAILABLE:
+      case DEADLINE_EXCEEDED:
+      case UNKNOWN:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  private boolean hasFallbackChannel() {
+    return fallback != null;
+  }
+
+  private CallOptions applyFallbackCredentials(CallOptions callOptions) {
+    if (fallbackCallCredentials != null && callOptions.getCredentials() == 
null) {
+      return callOptions.withCallCredentials(fallbackCallCredentials);
+    }
+    return callOptions;
+  }
+
+  private boolean shouldFallBackDueToPrimaryState() {
+    ConnectivityState connectivityState = primary.getState(true);
+    if (connectivityState == ConnectivityState.READY) {
+      primaryNotReadySinceNanos.set(-1);
+      return false;
+    }
+    long currentTimeNanos = nanoClock.getAsLong();
+    if (primaryNotReadySinceNanos.get() < 0) {
+      primaryNotReadySinceNanos.set(currentTimeNanos);
+    }
+    return currentTimeNanos - primaryNotReadySinceNanos.get() > 
PRIMARY_NOT_READY_WAIT_NANOS;
+  }
+
+  private void notifyFailure(Status status, boolean isFallback, String 
methodName) {

Review Comment:
   nit: notifyCallDone? we call it on success too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to