yyj8 commented on PR #22133:
URL: https://github.com/apache/pulsar/pull/22133#issuecomment-1971004758

   > The idea for `ControlledClusterFailover` is to move the control of whether 
a cluster is "healthy" outside of the Pulsar scope. You just need to implement 
a HTTP service that will tell the clients where to connect.
   
   The client class `AutoClusterFailover.java `  has implemented a timed 
detection service to check if the IP and port are in a normal state, and based 
on the detection results, determine whether it is necessary to switch cluster 
connection addresses. So, we only need to add a request type 
(CommandHealthCheck) in the detection logic to obtain the cluster health status.
   
   Check and update the logic as follows:
   ```
     private void probeAndUpdateServiceUrl(List<String> targetServiceUrls,
                                             Map<String, Authentication> 
authentications,
                                             Map<String, String> 
tlsTrustCertsFilePaths,
                                             Map<String, String> 
tlsTrustStorePaths,
                                             Map<String, String> 
tlsTrustStorePasswords) {
           if (probeAvailable(currentPulsarServiceUrl)) {
               failedTimestamp = -1;
               return;
           }
   
           long currentTimestamp = System.nanoTime();
           if (failedTimestamp == -1) {
               failedTimestamp = currentTimestamp;
           } else if (currentTimestamp - failedTimestamp >= failoverDelayNs) {
               for (String targetServiceUrl : targetServiceUrls) {
                   if (probeAvailable(targetServiceUrl)) {
                       log.info("Current Pulsar service is {}, it has been down 
for {} ms, "
                                       + "switch to the service {}. The current 
service down at {}",
                               currentPulsarServiceUrl, 
nanosToMillis(currentTimestamp - failedTimestamp),
                               targetServiceUrl, failedTimestamp);
                       updateServiceUrl(targetServiceUrl,
                               authentications != null ? 
authentications.get(targetServiceUrl) : null,
                               tlsTrustCertsFilePaths != null ? 
tlsTrustCertsFilePaths.get(targetServiceUrl) : null,
                               tlsTrustStorePaths != null ? 
tlsTrustStorePaths.get(targetServiceUrl) : null,
                               tlsTrustStorePasswords != null ? 
tlsTrustStorePasswords.get(targetServiceUrl) : null);
                       failedTimestamp = -1;
                       break;
                   } else {
                       log.warn("Current Pulsar service is {}, it has been down 
for {} ms. "
                                       + "Failed to switch to service {}, "
                                       + "because it is not available, continue 
to probe next pulsar service.",
                           currentPulsarServiceUrl, 
nanosToMillis(currentTimestamp - failedTimestamp), targetServiceUrl);
                   }
               }
           }
       }
   ```
   
   Then, in the probe method, add a newly defined request logic for obtaining 
cluster health status.
   ```
   boolean probeAvailable(String url) {
           try {
               resolver.updateServiceUrl(url);
               InetSocketAddress endpoint = resolver.resolveHost();
               Socket socket = new Socket();
               socket.connect(new InetSocketAddress(endpoint.getHostName(), 
endpoint.getPort()), TIMEOUT);
               socket.close();
   
               //Below is the newly added code
               ClientCnx clientCnx = pulsarClient.getCnxPool()
                       .getConnection(new 
InetSocketAddress(endpoint.getHostName(), endpoint.getPort()))
                       .get();
               clientCnx.ctx().writeAndFlush(Commands.newHealthCheck()).sync();
   
               return clientCnx.isClusterAvailable();
           } catch (Exception e) {
               log.warn("Failed to probe available, url: {}", url, e);
               return false;
           }
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to