yyj8 commented on PR #22133:
URL: https://github.com/apache/pulsar/pull/22133#issuecomment-1971004758
> The idea for `ControlledClusterFailover` is to move the control of whether
a cluster is "healthy" outside of the Pulsar scope. You just need to implement
a HTTP service that will tell the clients where to connect.
The client class `AutoClusterFailover.java ` has implemented a timed
detection service to check if the IP and port are in a normal state, and based
on the detection results, determine whether it is necessary to switch cluster
connection addresses. So, we only need to add a request type
(CommandHealthCheck) in the detection logic to obtain the cluster health status.
Check and update the logic as follows:
```
private void probeAndUpdateServiceUrl(List<String> targetServiceUrls,
Map<String, Authentication>
authentications,
Map<String, String>
tlsTrustCertsFilePaths,
Map<String, String>
tlsTrustStorePaths,
Map<String, String>
tlsTrustStorePasswords) {
if (probeAvailable(currentPulsarServiceUrl)) {
failedTimestamp = -1;
return;
}
long currentTimestamp = System.nanoTime();
if (failedTimestamp == -1) {
failedTimestamp = currentTimestamp;
} else if (currentTimestamp - failedTimestamp >= failoverDelayNs) {
for (String targetServiceUrl : targetServiceUrls) {
if (probeAvailable(targetServiceUrl)) {
log.info("Current Pulsar service is {}, it has been down
for {} ms, "
+ "switch to the service {}. The current
service down at {}",
currentPulsarServiceUrl,
nanosToMillis(currentTimestamp - failedTimestamp),
targetServiceUrl, failedTimestamp);
updateServiceUrl(targetServiceUrl,
authentications != null ?
authentications.get(targetServiceUrl) : null,
tlsTrustCertsFilePaths != null ?
tlsTrustCertsFilePaths.get(targetServiceUrl) : null,
tlsTrustStorePaths != null ?
tlsTrustStorePaths.get(targetServiceUrl) : null,
tlsTrustStorePasswords != null ?
tlsTrustStorePasswords.get(targetServiceUrl) : null);
failedTimestamp = -1;
break;
} else {
log.warn("Current Pulsar service is {}, it has been down
for {} ms. "
+ "Failed to switch to service {}, "
+ "because it is not available, continue
to probe next pulsar service.",
currentPulsarServiceUrl,
nanosToMillis(currentTimestamp - failedTimestamp), targetServiceUrl);
}
}
}
}
```
Then, in the probe method, add a newly defined request logic for obtaining
cluster health status.
```
boolean probeAvailable(String url) {
try {
resolver.updateServiceUrl(url);
InetSocketAddress endpoint = resolver.resolveHost();
Socket socket = new Socket();
socket.connect(new InetSocketAddress(endpoint.getHostName(),
endpoint.getPort()), TIMEOUT);
socket.close();
//Below is the newly added code
ClientCnx clientCnx = pulsarClient.getCnxPool()
.getConnection(new
InetSocketAddress(endpoint.getHostName(), endpoint.getPort()))
.get();
clientCnx.ctx().writeAndFlush(Commands.newHealthCheck()).sync();
return clientCnx.isClusterAvailable();
} catch (Exception e) {
log.warn("Failed to probe available, url: {}", url, e);
return false;
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]