This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 726cf68715 Update FailureDetector recovery logic to not break if only 
one query engine is being used (#15086)
726cf68715 is described below

commit 726cf68715c6b97b42b6ad829cf2aed1ee60c725
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Feb 20 22:23:56 2025 +0530

    Update FailureDetector recovery logic to not break if only one query engine 
is being used (#15086)
---
 .../requesthandler/GrpcBrokerRequestHandler.java   | 22 ++++++++++++------
 .../MultiStageBrokerRequestHandler.java            |  4 ++--
 .../SingleConnectionBrokerRequestHandler.java      | 15 +++++++++----
 ...BaseExponentialBackoffRetryFailureDetector.java | 10 +++++----
 .../common/failuredetector/FailureDetector.java    | 12 ++++++++--
 .../failuredetector/NoOpFailureDetector.java       |  2 +-
 .../ConnectionFailureDetectorTest.java             | 26 +++++++++++++---------
 .../apache/pinot/core/transport/QueryRouter.java   |  8 +++++++
 .../pinot/core/transport/ServerChannels.java       |  4 ++++
 .../query/service/dispatch/QueryDispatcher.java    | 20 ++++++++++++-----
 10 files changed, 87 insertions(+), 36 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index d192a6034c..7f90f5692a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -166,22 +166,30 @@ public class GrpcBrokerRequestHandler extends 
BaseSingleStageBrokerRequestHandle
   /**
    * Check if a server that was previously detected as unhealthy is now 
healthy.
    */
-  private boolean retryUnhealthyServer(String instanceId) {
+  private FailureDetector.ServerState retryUnhealthyServer(String instanceId) {
     LOGGER.info("Checking gRPC connection to unhealthy server: {}", 
instanceId);
     ServerInstance serverInstance = 
_routingManager.getEnabledServerInstanceMap().get(instanceId);
     if (serverInstance == null) {
       LOGGER.info("Failed to find enabled server: {} in routing manager, 
skipping the retry", instanceId);
-      return false;
+      return FailureDetector.ServerState.UNHEALTHY;
     }
 
     String hostnamePort = 
_streamingQueryClient._instanceIdToHostnamePortMap.get(instanceId);
+    // Could occur if the cluster is only serving multi-stage queries
+    if (hostnamePort == null) {
+      LOGGER.debug("No GrpcQueryClient found for server with instanceId: {}", 
instanceId);
+      return FailureDetector.ServerState.UNKNOWN;
+    }
+
     ServerGrpcQueryClient client = 
_streamingQueryClient._grpcQueryClientMap.get(hostnamePort);
 
-    if (client == null) {
-      LOGGER.warn("No GrpcQueryClient found for server with instanceId: {}", 
instanceId);
-      return false;
+    ConnectivityState connectivityState = client.getChannel().getState(true);
+    if (connectivityState == ConnectivityState.READY) {
+      LOGGER.info("Successfully connected to server: {}", instanceId);
+      return FailureDetector.ServerState.HEALTHY;
+    } else {
+      LOGGER.info("Still can't connect to server: {}, current state: {}", 
instanceId, connectivityState);
+      return FailureDetector.ServerState.UNHEALTHY;
     }
-
-    return client.getChannel().getState(true) == ConnectivityState.READY;
   }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d17361926d..fac9c86562 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -551,12 +551,12 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   /**
    * Check if a server that was previously detected as unhealthy is now 
healthy.
    */
-  public boolean retryUnhealthyServer(String instanceId) {
+  public FailureDetector.ServerState retryUnhealthyServer(String instanceId) {
     LOGGER.info("Checking gRPC connection to unhealthy server: {}", 
instanceId);
     ServerInstance serverInstance = 
_routingManager.getEnabledServerInstanceMap().get(instanceId);
     if (serverInstance == null) {
       LOGGER.info("Failed to find enabled server: {} in routing manager, 
skipping the retry", instanceId);
-      return false;
+      return FailureDetector.ServerState.UNHEALTHY;
     }
 
     return _queryDispatcher.checkConnectivityToInstance(instanceId);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 28143565a1..98ef386d7c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -179,19 +179,26 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseSingleStageBrokerR
   /**
    * Check if a server that was previously detected as unhealthy is now 
healthy.
    */
-  public boolean retryUnhealthyServer(String instanceId) {
+  public FailureDetector.ServerState retryUnhealthyServer(String instanceId) {
     LOGGER.info("Retrying unhealthy server: {}", instanceId);
     ServerInstance serverInstance = 
_routingManager.getEnabledServerInstanceMap().get(instanceId);
+
     if (serverInstance == null) {
       LOGGER.info("Failed to find enabled server: {} in routing manager, 
skipping the retry", instanceId);
-      return false;
+      return FailureDetector.ServerState.UNHEALTHY;
+    }
+
+    // Could occur if the cluster is only serving multi-stage queries
+    if (!_queryRouter.hasChannel(serverInstance)) {
+      return FailureDetector.ServerState.UNKNOWN;
     }
+
     if (_queryRouter.connect(serverInstance)) {
       LOGGER.info("Successfully connect to server: {}, marking it healthy", 
instanceId);
-      return true;
+      return FailureDetector.ServerState.HEALTHY;
     } else {
       LOGGER.warn("Still cannot connect to server: {}, retry later", 
instanceId);
-      return false;
+      return FailureDetector.ServerState.UNHEALTHY;
     }
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/BaseExponentialBackoffRetryFailureDetector.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/BaseExponentialBackoffRetryFailureDetector.java
index 088ddfc6f0..5d7e7c3b51 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/BaseExponentialBackoffRetryFailureDetector.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/BaseExponentialBackoffRetryFailureDetector.java
@@ -48,7 +48,7 @@ public abstract class 
BaseExponentialBackoffRetryFailureDetector implements Fail
   protected final ConcurrentHashMap<String, RetryInfo> 
_unhealthyServerRetryInfoMap = new ConcurrentHashMap<>();
   protected final DelayQueue<RetryInfo> _retryInfoDelayQueue = new 
DelayQueue<>();
 
-  protected final List<Function<String, Boolean>> _unhealthyServerRetriers = 
new ArrayList<>();
+  protected final List<Function<String, ServerState>> _unhealthyServerRetriers 
= new ArrayList<>();
   protected Consumer<String> _healthyServerNotifier;
   protected Consumer<String> _unhealthyServerNotifier;
   protected BrokerMetrics _brokerMetrics;
@@ -74,7 +74,7 @@ public abstract class 
BaseExponentialBackoffRetryFailureDetector implements Fail
   }
 
   @Override
-  public void registerUnhealthyServerRetrier(Function<String, Boolean> 
unhealthyServerRetrier) {
+  public void registerUnhealthyServerRetrier(Function<String, ServerState> 
unhealthyServerRetrier) {
     _unhealthyServerRetriers.add(unhealthyServerRetrier);
   }
 
@@ -110,9 +110,11 @@ public abstract class 
BaseExponentialBackoffRetryFailureDetector implements Fail
           }
           LOGGER.info("Retry unhealthy server: {}", instanceId);
           boolean recovered = true;
-          for (Function<String, Boolean> unhealthyServerRetrier : 
_unhealthyServerRetriers) {
-            if (!unhealthyServerRetrier.apply(instanceId)) {
+          for (Function<String, ServerState> unhealthyServerRetrier : 
_unhealthyServerRetriers) {
+            ServerState serverState = unhealthyServerRetrier.apply(instanceId);
+            if (serverState == ServerState.UNHEALTHY) {
               recovered = false;
+              break;
             }
           }
           if (recovered) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/FailureDetector.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/FailureDetector.java
index 33d47e1e8c..3f389958b2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/FailureDetector.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/FailureDetector.java
@@ -45,9 +45,11 @@ public interface FailureDetector {
 
   /**
    * Registers a function that will be periodically called to retry unhealthy 
servers. The function is called with the
-   * instanceId of the unhealthy server and should return true if the server 
is now healthy, false otherwise.
+   * instanceId of the unhealthy server and should return {@link 
ServerState#HEALTHY} if the server is now healthy,
+   * {@link ServerState#UNHEALTHY} if the server is still unhealthy, and 
{@link ServerState#UNKNOWN} if the retrier
+   * does not know about this server.
    */
-  void registerUnhealthyServerRetrier(Function<String, Boolean> 
unhealthyServerRetrier);
+  void registerUnhealthyServerRetrier(Function<String, ServerState> 
unhealthyServerRetrier);
 
   /**
    * Registers a consumer that will be called with the instanceId of a server 
that is detected as healthy.
@@ -83,4 +85,10 @@ public interface FailureDetector {
    * Stops the failure detector.
    */
   void stop();
+
+  enum ServerState {
+    HEALTHY,
+    UNHEALTHY,
+    UNKNOWN
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/NoOpFailureDetector.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/NoOpFailureDetector.java
index 3b81d11b18..e6f68d62c0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/NoOpFailureDetector.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/NoOpFailureDetector.java
@@ -35,7 +35,7 @@ public class NoOpFailureDetector implements FailureDetector {
   }
 
   @Override
-  public void registerUnhealthyServerRetrier(Function<String, Boolean> 
unhealthyServerRetrier) {
+  public void registerUnhealthyServerRetrier(Function<String, ServerState> 
unhealthyServerRetrier) {
   }
 
   @Override
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.java
index 45dd435b55..8ed3043f18 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.java
@@ -139,37 +139,42 @@ public class ConnectionFailureDetectorTest {
 
   @Test
   public void testRetryWithMultipleUnhealthyServerRetriers() {
-    _unhealthyServerRetrier = new UnhealthyServerRetrier(7);
+    _unhealthyServerRetrier = new UnhealthyServerRetrier(5);
     _failureDetector.registerUnhealthyServerRetrier(_unhealthyServerRetrier);
 
-    UnhealthyServerRetrier unhealthyServerRetrier2 = new 
UnhealthyServerRetrier(8);
+    // This retrier will only be called after the first retrier starts 
returning HEALTHY. So we expect a total of 7
+    // failures and 8 retries until the server is marked as healthy again.
+    UnhealthyServerRetrier unhealthyServerRetrier2 = new 
UnhealthyServerRetrier(2);
     _failureDetector.registerUnhealthyServerRetrier(unhealthyServerRetrier2);
 
+    // Register a retrier that isn't aware of the failing server. This should 
not affect the retry process.
+    _failureDetector.registerUnhealthyServerRetrier(instanceId -> 
FailureDetector.ServerState.UNKNOWN);
+
     _failureDetector.markServerUnhealthy(INSTANCE_ID);
     verify(Collections.singleton(INSTANCE_ID), 1, 0);
 
     // Should retry until both unhealthy server retriers return that the 
server is healthy
     TestUtils.waitForCondition(aVoid -> {
       int numRetries = _unhealthyServerRetrier._retryUnhealthyServerCalled;
-      if (numRetries < 9) {
+      if (numRetries < 8) {
         // Avoid test flakiness by not making these assertions close to the 
end of the expected retry period
-        if (numRetries > 0 && numRetries <= 7) {
+        if (numRetries > 0 && numRetries <= 5) {
           assertEquals(_failureDetector.getUnhealthyServers(), 
Collections.singleton(INSTANCE_ID));
           assertEquals(MetricValueUtils.getGlobalGaugeValue(_brokerMetrics, 
BrokerGauge.UNHEALTHY_SERVERS), 1);
         }
         return false;
       }
-      assertEquals(numRetries, 9);
+      assertEquals(numRetries, 8);
       // There might be a small delay between the successful attempt and 
removing failed server from the unhealthy
       // servers. Perform a check instead of an assertion.
       return _failureDetector.getUnhealthyServers().isEmpty()
           && MetricValueUtils.getGaugeValue(_brokerMetrics, 
BrokerGauge.UNHEALTHY_SERVERS.getGaugeName()) == 0
           && _unhealthyServerNotifier._notifyUnhealthyServerCalled == 1
           && _healthyServerNotifier._notifyHealthyServerCalled == 1;
-    }, 5_000L, "Failed to get 5 retries");
+    }, 5_000L, "Failed to get 8 retries");
 
     // Verify no further retries
-    assertEquals(_unhealthyServerRetrier._retryUnhealthyServerCalled, 9);
+    assertEquals(_unhealthyServerRetrier._retryUnhealthyServerCalled, 8);
   }
 
   private void verify(Set<String> expectedUnhealthyServers, int 
expectedNotifyUnhealthyServerCalled,
@@ -206,7 +211,7 @@ public class ConnectionFailureDetectorTest {
     }
   }
 
-  private static class UnhealthyServerRetrier implements Function<String, 
Boolean> {
+  private static class UnhealthyServerRetrier implements Function<String, 
FailureDetector.ServerState> {
     int _retryUnhealthyServerCalled = 0;
     final int _numFailures;
 
@@ -215,10 +220,11 @@ public class ConnectionFailureDetectorTest {
     }
 
     @Override
-    public Boolean apply(String instanceId) {
+    public FailureDetector.ServerState apply(String instanceId) {
       assertEquals(instanceId, INSTANCE_ID);
       _retryUnhealthyServerCalled++;
-      return _retryUnhealthyServerCalled > _numFailures;
+      return _retryUnhealthyServerCalled > _numFailures ? 
FailureDetector.ServerState.HEALTHY
+          : FailureDetector.ServerState.UNHEALTHY;
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index d0177e86b0..1debcd2f75 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -169,6 +169,14 @@ public class QueryRouter {
     asyncQueryResponse.markQueryFailed(serverRoutingInstance, e);
   }
 
+  public boolean hasChannel(ServerInstance serverInstance) {
+    if (_serverChannelsTls != null) {
+      return 
_serverChannelsTls.hasChannel(serverInstance.toServerRoutingInstance(TableType.OFFLINE,
 true));
+    } else {
+      return 
_serverChannels.hasChannel(serverInstance.toServerRoutingInstance(TableType.OFFLINE,
 false));
+    }
+  }
+
   /**
    * Connects to the given server, returns {@code true} if the server is 
successfully connected.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index c9fe068ed4..b1e0d8c60b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -133,6 +133,10 @@ public class ServerChannels {
         .sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance, 
requestBytes, timeoutMs);
   }
 
+  public boolean hasChannel(ServerRoutingInstance serverRoutingInstance) {
+    return _serverToChannelMap.containsKey(serverRoutingInstance);
+  }
+
   public void connect(ServerRoutingInstance serverRoutingInstance)
       throws InterruptedException, TimeoutException {
     _serverToChannelMap.computeIfAbsent(serverRoutingInstance, 
ServerChannel::new).connect();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index de347e7c07..9eb0297c7e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -217,16 +217,24 @@ public class QueryDispatcher {
     }
   }
 
-  public boolean checkConnectivityToInstance(String instanceId) {
+  public FailureDetector.ServerState checkConnectivityToInstance(String 
instanceId) {
     String hostnamePort = _instanceIdToHostnamePortMap.get(instanceId);
-    DispatchClient client = _dispatchClientMap.get(hostnamePort);
 
-    if (client == null) {
-      LOGGER.warn("No DispatchClient found for server with instanceId: {}", 
instanceId);
-      return false;
+    // Could occur if the cluster is only serving single-stage queries
+    if (hostnamePort == null) {
+      LOGGER.debug("No DispatchClient found for server with instanceId: {}", 
instanceId);
+      return FailureDetector.ServerState.UNKNOWN;
     }
 
-    return client.getChannel().getState(true) == ConnectivityState.READY;
+    DispatchClient client = _dispatchClientMap.get(hostnamePort);
+    ConnectivityState connectivityState = client.getChannel().getState(true);
+    if (connectivityState == ConnectivityState.READY) {
+      LOGGER.info("Successfully connected to server: {}", instanceId);
+      return FailureDetector.ServerState.HEALTHY;
+    } else {
+      LOGGER.info("Still can't connect to server: {}, current state: {}", 
instanceId, connectivityState);
+      return FailureDetector.ServerState.UNHEALTHY;
+    }
   }
 
   private boolean isQueryCancellationEnabled() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to