This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 726cf68715 Update FailureDetector recovery logic to not break if only
one query engine is being used (#15086)
726cf68715 is described below
commit 726cf68715c6b97b42b6ad829cf2aed1ee60c725
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Feb 20 22:23:56 2025 +0530
Update FailureDetector recovery logic to not break if only one query engine
is being used (#15086)
---
.../requesthandler/GrpcBrokerRequestHandler.java | 22 ++++++++++++------
.../MultiStageBrokerRequestHandler.java | 4 ++--
.../SingleConnectionBrokerRequestHandler.java | 15 +++++++++----
...BaseExponentialBackoffRetryFailureDetector.java | 10 +++++----
.../common/failuredetector/FailureDetector.java | 12 ++++++++--
.../failuredetector/NoOpFailureDetector.java | 2 +-
.../ConnectionFailureDetectorTest.java | 26 +++++++++++++---------
.../apache/pinot/core/transport/QueryRouter.java | 8 +++++++
.../pinot/core/transport/ServerChannels.java | 4 ++++
.../query/service/dispatch/QueryDispatcher.java | 20 ++++++++++++-----
10 files changed, 87 insertions(+), 36 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index d192a6034c..7f90f5692a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -166,22 +166,30 @@ public class GrpcBrokerRequestHandler extends
BaseSingleStageBrokerRequestHandle
/**
* Check if a server that was previously detected as unhealthy is now
healthy.
*/
- private boolean retryUnhealthyServer(String instanceId) {
+ private FailureDetector.ServerState retryUnhealthyServer(String instanceId) {
LOGGER.info("Checking gRPC connection to unhealthy server: {}",
instanceId);
ServerInstance serverInstance =
_routingManager.getEnabledServerInstanceMap().get(instanceId);
if (serverInstance == null) {
LOGGER.info("Failed to find enabled server: {} in routing manager,
skipping the retry", instanceId);
- return false;
+ return FailureDetector.ServerState.UNHEALTHY;
}
String hostnamePort =
_streamingQueryClient._instanceIdToHostnamePortMap.get(instanceId);
+ // Could occur if the cluster is only serving multi-stage queries
+ if (hostnamePort == null) {
+ LOGGER.debug("No GrpcQueryClient found for server with instanceId: {}",
instanceId);
+ return FailureDetector.ServerState.UNKNOWN;
+ }
+
ServerGrpcQueryClient client =
_streamingQueryClient._grpcQueryClientMap.get(hostnamePort);
- if (client == null) {
- LOGGER.warn("No GrpcQueryClient found for server with instanceId: {}",
instanceId);
- return false;
+ ConnectivityState connectivityState = client.getChannel().getState(true);
+ if (connectivityState == ConnectivityState.READY) {
+ LOGGER.info("Successfully connected to server: {}", instanceId);
+ return FailureDetector.ServerState.HEALTHY;
+ } else {
+ LOGGER.info("Still can't connect to server: {}, current state: {}",
instanceId, connectivityState);
+ return FailureDetector.ServerState.UNHEALTHY;
}
-
- return client.getChannel().getState(true) == ConnectivityState.READY;
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d17361926d..fac9c86562 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -551,12 +551,12 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
/**
* Check if a server that was previously detected as unhealthy is now
healthy.
*/
- public boolean retryUnhealthyServer(String instanceId) {
+ public FailureDetector.ServerState retryUnhealthyServer(String instanceId) {
LOGGER.info("Checking gRPC connection to unhealthy server: {}",
instanceId);
ServerInstance serverInstance =
_routingManager.getEnabledServerInstanceMap().get(instanceId);
if (serverInstance == null) {
LOGGER.info("Failed to find enabled server: {} in routing manager,
skipping the retry", instanceId);
- return false;
+ return FailureDetector.ServerState.UNHEALTHY;
}
return _queryDispatcher.checkConnectivityToInstance(instanceId);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 28143565a1..98ef386d7c 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -179,19 +179,26 @@ public class SingleConnectionBrokerRequestHandler extends
BaseSingleStageBrokerR
/**
* Check if a server that was previously detected as unhealthy is now
healthy.
*/
- public boolean retryUnhealthyServer(String instanceId) {
+ public FailureDetector.ServerState retryUnhealthyServer(String instanceId) {
LOGGER.info("Retrying unhealthy server: {}", instanceId);
ServerInstance serverInstance =
_routingManager.getEnabledServerInstanceMap().get(instanceId);
+
if (serverInstance == null) {
LOGGER.info("Failed to find enabled server: {} in routing manager,
skipping the retry", instanceId);
- return false;
+ return FailureDetector.ServerState.UNHEALTHY;
+ }
+
+ // Could occur if the cluster is only serving multi-stage queries
+ if (!_queryRouter.hasChannel(serverInstance)) {
+ return FailureDetector.ServerState.UNKNOWN;
}
+
if (_queryRouter.connect(serverInstance)) {
LOGGER.info("Successfully connect to server: {}, marking it healthy",
instanceId);
- return true;
+ return FailureDetector.ServerState.HEALTHY;
} else {
LOGGER.warn("Still cannot connect to server: {}, retry later",
instanceId);
- return false;
+ return FailureDetector.ServerState.UNHEALTHY;
}
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/BaseExponentialBackoffRetryFailureDetector.java
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/BaseExponentialBackoffRetryFailureDetector.java
index 088ddfc6f0..5d7e7c3b51 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/BaseExponentialBackoffRetryFailureDetector.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/BaseExponentialBackoffRetryFailureDetector.java
@@ -48,7 +48,7 @@ public abstract class
BaseExponentialBackoffRetryFailureDetector implements Fail
protected final ConcurrentHashMap<String, RetryInfo>
_unhealthyServerRetryInfoMap = new ConcurrentHashMap<>();
protected final DelayQueue<RetryInfo> _retryInfoDelayQueue = new
DelayQueue<>();
- protected final List<Function<String, Boolean>> _unhealthyServerRetriers =
new ArrayList<>();
+ protected final List<Function<String, ServerState>> _unhealthyServerRetriers
= new ArrayList<>();
protected Consumer<String> _healthyServerNotifier;
protected Consumer<String> _unhealthyServerNotifier;
protected BrokerMetrics _brokerMetrics;
@@ -74,7 +74,7 @@ public abstract class
BaseExponentialBackoffRetryFailureDetector implements Fail
}
@Override
- public void registerUnhealthyServerRetrier(Function<String, Boolean>
unhealthyServerRetrier) {
+ public void registerUnhealthyServerRetrier(Function<String, ServerState>
unhealthyServerRetrier) {
_unhealthyServerRetriers.add(unhealthyServerRetrier);
}
@@ -110,9 +110,11 @@ public abstract class
BaseExponentialBackoffRetryFailureDetector implements Fail
}
LOGGER.info("Retry unhealthy server: {}", instanceId);
boolean recovered = true;
- for (Function<String, Boolean> unhealthyServerRetrier :
_unhealthyServerRetriers) {
- if (!unhealthyServerRetrier.apply(instanceId)) {
+ for (Function<String, ServerState> unhealthyServerRetrier :
_unhealthyServerRetriers) {
+ ServerState serverState = unhealthyServerRetrier.apply(instanceId);
+ if (serverState == ServerState.UNHEALTHY) {
recovered = false;
+ break;
}
}
if (recovered) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/FailureDetector.java
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/FailureDetector.java
index 33d47e1e8c..3f389958b2 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/FailureDetector.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/FailureDetector.java
@@ -45,9 +45,11 @@ public interface FailureDetector {
/**
* Registers a function that will be periodically called to retry unhealthy
servers. The function is called with the
- * instanceId of the unhealthy server and should return true if the server
is now healthy, false otherwise.
+ * instanceId of the unhealthy server and should return {@link
ServerState#HEALTHY} if the server is now healthy,
+ * {@link ServerState#UNHEALTHY} if the server is still unhealthy, and
{@link ServerState#UNKNOWN} if the retrier
+ * does not know about this server.
*/
- void registerUnhealthyServerRetrier(Function<String, Boolean>
unhealthyServerRetrier);
+ void registerUnhealthyServerRetrier(Function<String, ServerState>
unhealthyServerRetrier);
/**
* Registers a consumer that will be called with the instanceId of a server
that is detected as healthy.
@@ -83,4 +85,10 @@ public interface FailureDetector {
* Stops the failure detector.
*/
void stop();
+
+ enum ServerState {
+ HEALTHY,
+ UNHEALTHY,
+ UNKNOWN
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/NoOpFailureDetector.java
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/NoOpFailureDetector.java
index 3b81d11b18..e6f68d62c0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/NoOpFailureDetector.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/failuredetector/NoOpFailureDetector.java
@@ -35,7 +35,7 @@ public class NoOpFailureDetector implements FailureDetector {
}
@Override
- public void registerUnhealthyServerRetrier(Function<String, Boolean>
unhealthyServerRetrier) {
+ public void registerUnhealthyServerRetrier(Function<String, ServerState>
unhealthyServerRetrier) {
}
@Override
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.java
index 45dd435b55..8ed3043f18 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/failuredetector/ConnectionFailureDetectorTest.java
@@ -139,37 +139,42 @@ public class ConnectionFailureDetectorTest {
@Test
public void testRetryWithMultipleUnhealthyServerRetriers() {
- _unhealthyServerRetrier = new UnhealthyServerRetrier(7);
+ _unhealthyServerRetrier = new UnhealthyServerRetrier(5);
_failureDetector.registerUnhealthyServerRetrier(_unhealthyServerRetrier);
- UnhealthyServerRetrier unhealthyServerRetrier2 = new
UnhealthyServerRetrier(8);
+ // This retrier will only be called after the first retrier starts
returning HEALTHY. So we expect a total of 7
+ // failures and 8 retries until the server is marked as healthy again.
+ UnhealthyServerRetrier unhealthyServerRetrier2 = new
UnhealthyServerRetrier(2);
_failureDetector.registerUnhealthyServerRetrier(unhealthyServerRetrier2);
+ // Register a retrier that isn't aware of the failing server. This should
not affect the retry process.
+ _failureDetector.registerUnhealthyServerRetrier(instanceId ->
FailureDetector.ServerState.UNKNOWN);
+
_failureDetector.markServerUnhealthy(INSTANCE_ID);
verify(Collections.singleton(INSTANCE_ID), 1, 0);
// Should retry until both unhealthy server retriers return that the
server is healthy
TestUtils.waitForCondition(aVoid -> {
int numRetries = _unhealthyServerRetrier._retryUnhealthyServerCalled;
- if (numRetries < 9) {
+ if (numRetries < 8) {
// Avoid test flakiness by not making these assertions close to the
end of the expected retry period
- if (numRetries > 0 && numRetries <= 7) {
+ if (numRetries > 0 && numRetries <= 5) {
assertEquals(_failureDetector.getUnhealthyServers(),
Collections.singleton(INSTANCE_ID));
assertEquals(MetricValueUtils.getGlobalGaugeValue(_brokerMetrics,
BrokerGauge.UNHEALTHY_SERVERS), 1);
}
return false;
}
- assertEquals(numRetries, 9);
+ assertEquals(numRetries, 8);
// There might be a small delay between the successful attempt and
removing failed server from the unhealthy
// servers. Perform a check instead of an assertion.
return _failureDetector.getUnhealthyServers().isEmpty()
&& MetricValueUtils.getGaugeValue(_brokerMetrics,
BrokerGauge.UNHEALTHY_SERVERS.getGaugeName()) == 0
&& _unhealthyServerNotifier._notifyUnhealthyServerCalled == 1
&& _healthyServerNotifier._notifyHealthyServerCalled == 1;
- }, 5_000L, "Failed to get 5 retries");
+ }, 5_000L, "Failed to get 8 retries");
// Verify no further retries
- assertEquals(_unhealthyServerRetrier._retryUnhealthyServerCalled, 9);
+ assertEquals(_unhealthyServerRetrier._retryUnhealthyServerCalled, 8);
}
private void verify(Set<String> expectedUnhealthyServers, int
expectedNotifyUnhealthyServerCalled,
@@ -206,7 +211,7 @@ public class ConnectionFailureDetectorTest {
}
}
- private static class UnhealthyServerRetrier implements Function<String,
Boolean> {
+ private static class UnhealthyServerRetrier implements Function<String,
FailureDetector.ServerState> {
int _retryUnhealthyServerCalled = 0;
final int _numFailures;
@@ -215,10 +220,11 @@ public class ConnectionFailureDetectorTest {
}
@Override
- public Boolean apply(String instanceId) {
+ public FailureDetector.ServerState apply(String instanceId) {
assertEquals(instanceId, INSTANCE_ID);
_retryUnhealthyServerCalled++;
- return _retryUnhealthyServerCalled > _numFailures;
+ return _retryUnhealthyServerCalled > _numFailures ?
FailureDetector.ServerState.HEALTHY
+ : FailureDetector.ServerState.UNHEALTHY;
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index d0177e86b0..1debcd2f75 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -169,6 +169,14 @@ public class QueryRouter {
asyncQueryResponse.markQueryFailed(serverRoutingInstance, e);
}
+ public boolean hasChannel(ServerInstance serverInstance) {
+ if (_serverChannelsTls != null) {
+ return
_serverChannelsTls.hasChannel(serverInstance.toServerRoutingInstance(TableType.OFFLINE,
true));
+ } else {
+ return
_serverChannels.hasChannel(serverInstance.toServerRoutingInstance(TableType.OFFLINE,
false));
+ }
+ }
+
/**
* Connects to the given server, returns {@code true} if the server is
successfully connected.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index c9fe068ed4..b1e0d8c60b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -133,6 +133,10 @@ public class ServerChannels {
.sendRequest(rawTableName, asyncQueryResponse, serverRoutingInstance,
requestBytes, timeoutMs);
}
+ public boolean hasChannel(ServerRoutingInstance serverRoutingInstance) {
+ return _serverToChannelMap.containsKey(serverRoutingInstance);
+ }
+
public void connect(ServerRoutingInstance serverRoutingInstance)
throws InterruptedException, TimeoutException {
_serverToChannelMap.computeIfAbsent(serverRoutingInstance,
ServerChannel::new).connect();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index de347e7c07..9eb0297c7e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -217,16 +217,24 @@ public class QueryDispatcher {
}
}
- public boolean checkConnectivityToInstance(String instanceId) {
+ public FailureDetector.ServerState checkConnectivityToInstance(String
instanceId) {
String hostnamePort = _instanceIdToHostnamePortMap.get(instanceId);
- DispatchClient client = _dispatchClientMap.get(hostnamePort);
- if (client == null) {
- LOGGER.warn("No DispatchClient found for server with instanceId: {}",
instanceId);
- return false;
+ // Could occur if the cluster is only serving single-stage queries
+ if (hostnamePort == null) {
+ LOGGER.debug("No DispatchClient found for server with instanceId: {}",
instanceId);
+ return FailureDetector.ServerState.UNKNOWN;
}
- return client.getChannel().getState(true) == ConnectivityState.READY;
+ DispatchClient client = _dispatchClientMap.get(hostnamePort);
+ ConnectivityState connectivityState = client.getChannel().getState(true);
+ if (connectivityState == ConnectivityState.READY) {
+ LOGGER.info("Successfully connected to server: {}", instanceId);
+ return FailureDetector.ServerState.HEALTHY;
+ } else {
+ LOGGER.info("Still can't connect to server: {}, current state: {}",
instanceId, connectivityState);
+ return FailureDetector.ServerState.UNHEALTHY;
+ }
}
private boolean isQueryCancellationEnabled() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]