This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9fd5b0862002b349d961a649c5c714bf01055a56
Author: fengyubiao <[email protected]>
AuthorDate: Tue Aug 13 00:47:04 2024 +0800

    [improve] [client]Add new ServiceUrlProvider implementation: 
SameAuthParamsAutoClusterFailover (#23129)
    
    (cherry picked from commit 06a2f5cc63c126a0262bec4602b81c1c716e4e36)
---
 ...ameAuthParamsLookupAutoClusterFailoverTest.java | 176 +++++++++++
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   2 +-
 .../broker/service/NetworkErrorTestBase.java       |   2 +-
 .../broker/service/OneWayReplicatorTestBase.java   |  21 +-
 .../pulsar/client/api/ServiceUrlProvider.java      |   2 +-
 .../SameAuthParamsLookupAutoClusterFailover.java   | 341 +++++++++++++++++++++
 .../client/impl/AutoClusterFailoverTest.java       |  12 +-
 .../client/impl/ControlledClusterFailoverTest.java |   5 +-
 8 files changed, 546 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
new file mode 100644
index 00000000000..b39f8135e0e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient;
+import static 
org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState;
+import io.netty.channel.EventLoopGroup;
+import java.net.ServerSocket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.NetworkErrorTestBase;
+import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class SameAuthParamsLookupAutoClusterFailoverTest extends 
OneWayReplicatorTestBase {
+
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @DataProvider(name = "enabledTls")
+    public Object[][] enabledTls () {
+        return new Object[][] {
+            {true},
+            {false}
+        };
+    }
+
+    @Test(dataProvider = "enabledTls", timeOut = 240 * 1000)
+    public void testAutoClusterFailover(boolean enabledTls) throws Exception {
+        // Start clusters.
+        setup();
+        ServerSocket dummyServer = new 
ServerSocket(NetworkErrorTestBase.getOneFreePort());
+
+        // Initialize client.
+        String urlProxy = enabledTls ? "pulsar+tls://127.0.0.1:" + 
dummyServer.getLocalPort()
+                : "pulsar://127.0.0.1:" + dummyServer.getLocalPort();
+        String url1 = enabledTls ? pulsar1.getBrokerServiceUrlTls() : 
pulsar1.getBrokerServiceUrl();
+        String url2 = enabledTls ? pulsar2.getBrokerServiceUrlTls() : 
pulsar2.getBrokerServiceUrl();
+        final String[] urlArray = new String[]{url1, urlProxy, url2};
+        final SameAuthParamsLookupAutoClusterFailover failover = 
SameAuthParamsLookupAutoClusterFailover.builder()
+                .pulsarServiceUrlArray(urlArray)
+                .failoverThreshold(5)
+                .recoverThreshold(5)
+                .checkHealthyIntervalMs(300)
+                .testTopic("a/b/c")
+                .markTopicNotFoundAsAvailable(true)
+                .build();
+        ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrlProvider(failover);
+        if (enabledTls) {
+            Map<String, String> authParams = new HashMap<>();
+            authParams.put("tlsCertFile", getTlsFileForClient("admin.cert"));
+            authParams.put("tlsKeyFile", getTlsFileForClient("admin.key-pk8"));
+            clientBuilder.authentication(AuthenticationTls.class.getName(), 
authParams)
+                .enableTls(true)
+                .allowTlsInsecureConnection(false)
+                .tlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+        }
+        final PulsarClient client = clientBuilder.build();
+        failover.initialize(client);
+        final EventLoopGroup executor = 
WhiteboxImpl.getInternalState(failover, "executor");
+        final PulsarServiceState[] stateArray =
+                WhiteboxImpl.getInternalState(failover, 
"pulsarServiceStateArray");
+
+        // Test all things is fine.
+        final String tp = BrokerTestUtil.newUniqueName(nonReplicatedNamespace 
+ "/tp");
+        final Producer<String> producer = 
client.newProducer(Schema.STRING).topic(tp).create();
+        producer.send("0");
+        Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
+
+        CompletableFuture<Boolean> checkStatesFuture1 = new 
CompletableFuture<>();
+        executor.submit(() -> {
+            boolean res = stateArray[0] == PulsarServiceState.Healthy;
+            res = res & stateArray[1] == PulsarServiceState.Healthy;
+            res = res & stateArray[2] == PulsarServiceState.Healthy;
+            checkStatesFuture1.complete(res);
+        });
+        Assert.assertTrue(checkStatesFuture1.join());
+
+        // Test failover 0 --> 3.
+        pulsar1.close();
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+            CompletableFuture<Boolean> checkStatesFuture2 = new 
CompletableFuture<>();
+            executor.submit(() -> {
+                boolean res = stateArray[0] == PulsarServiceState.Failed;
+                res = res & stateArray[1] == PulsarServiceState.Failed;
+                res = res & stateArray[2] == PulsarServiceState.Healthy;
+                checkStatesFuture2.complete(res);
+            });
+            Assert.assertTrue(checkStatesFuture2.join());
+            producer.send("0->2");
+            Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 2);
+        });
+
+        // Test recover 2 --> 1.
+        executor.execute(() -> {
+            urlArray[1] = url2;
+        });
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+            CompletableFuture<Boolean> checkStatesFuture3 = new 
CompletableFuture<>();
+            executor.submit(() -> {
+                boolean res = stateArray[0] == PulsarServiceState.Failed;
+                res = res & stateArray[1] == PulsarServiceState.Healthy;
+                res = res & stateArray[2] == PulsarServiceState.Healthy;
+                checkStatesFuture3.complete(res);
+            });
+            Assert.assertTrue(checkStatesFuture3.join());
+            producer.send("2->1");
+            Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 1);
+        });
+
+        // Test recover 1 --> 0.
+        executor.execute(() -> {
+            urlArray[0] = url2;
+        });
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+            CompletableFuture<Boolean> checkStatesFuture4 = new 
CompletableFuture<>();
+            executor.submit(() -> {
+                boolean res = stateArray[0] == PulsarServiceState.Healthy;
+                res = res & stateArray[1] == PulsarServiceState.Healthy;
+                res = res & stateArray[2] == PulsarServiceState.Healthy;
+                checkStatesFuture4.complete(res);
+            });
+            Assert.assertTrue(checkStatesFuture4.join());
+            producer.send("1->0");
+            Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
+        });
+
+        // cleanup.
+        producer.close();
+        client.close();
+        dummyServer.close();
+    }
+
+    @Override
+    protected void cleanupPulsarResources() {
+        // Nothing to do.
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index eef4469aa95..e155e399e24 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -84,7 +84,7 @@ import org.testng.annotations.DataProvider;
 public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
     // All certificate-authority files are copied from the 
tests/certificate-authority directory and all share the same
     // root CA.
-    protected static String getTlsFileForClient(String name) {
+    public static String getTlsFileForClient(String name) {
         return 
ResourceUtils.getAbsolutePath(String.format("certificate-authority/client-keys/%s.pem",
 name));
     }
     public final static String CA_CERT_FILE_PATH =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
index 36f8cb47612..742194d9b12 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
@@ -102,7 +102,7 @@ public abstract class NetworkErrorTestBase extends 
TestRetrySupport {
         log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(), 
broker2.getListenPort());
     }
 
-    protected int getOneFreePort() throws IOException {
+    public static int getOneFreePort() throws IOException {
         ServerSocket serverSocket = new ServerSocket(0);
         int port = serverSocket.getLocalPort();
         serverSocket.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index d66e666e3a0..f3076ebdec6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_CERT_FILE_PATH;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_KEY_FILE_PATH;
+import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -267,10 +270,18 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
         config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
         config.setLoadBalancerSheddingEnabled(false);
         config.setForceDeleteNamespaceAllowed(true);
+        config.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
+        config.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
+        config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+        config.setClusterName(clusterName);
+        config.setTlsRequireTrustedClientCertOnConnect(false);
+        Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+        tlsProtocols.add("TLSv1.3");
+        tlsProtocols.add("TLSv1.2");
+        config.setTlsProtocols(tlsProtocols);
     }
 
-    @Override
-    protected void cleanup() throws Exception {
+    protected void cleanupPulsarResources() throws Exception {
         // delete namespaces.
         waitChangeEventsInit(replicatedNamespace);
         
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, 
Sets.newHashSet(cluster1));
@@ -283,6 +294,12 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
             admin2.namespaces().deleteNamespace(replicatedNamespace, true);
             admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
         }
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        // cleanup pulsar resources.
+        cleanupPulsarResources();
 
         // shutdown.
         markCurrentSetupNumberCleaned();
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
index 5cb22276553..e8b513b103f 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
@@ -56,7 +56,7 @@ public interface ServiceUrlProvider extends AutoCloseable {
      *
      */
     @Override
-    default void close() {
+    default void close() throws Exception {
         // do nothing
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
new file mode 100644
index 00000000000..4beff4719c8
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.ScheduledFuture;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+
+@Slf4j
+@SuppressFBWarnings(value = {"EI_EXPOSE_REP2"})
+public class SameAuthParamsLookupAutoClusterFailover implements 
ServiceUrlProvider {
+
+    private PulsarClientImpl pulsarClient;
+    private EventLoopGroup executor;
+    private volatile boolean closed;
+    private ScheduledFuture<?> scheduledCheckTask;
+    @Getter
+    private int failoverThreshold = 5;
+    @Getter
+    private int recoverThreshold = 5;
+    @Getter
+    private long checkHealthyIntervalMs = 1000;
+    @Getter
+    private boolean markTopicNotFoundAsAvailable = true;
+    @Getter
+    private String testTopic = "public/default/tp_test";
+
+    private String[] pulsarServiceUrlArray;
+    private PulsarServiceState[] pulsarServiceStateArray;
+    private MutableInt[] checkCounterArray;
+    @Getter
+    private volatile int currentPulsarServiceIndex;
+
+    private SameAuthParamsLookupAutoClusterFailover() {}
+
+    @Override
+    public void initialize(PulsarClient client) {
+        this.currentPulsarServiceIndex = 0;
+        this.pulsarClient = (PulsarClientImpl) client;
+        this.executor = EventLoopUtil.newEventLoopGroup(1, false,
+                new 
ExecutorProvider.ExtendedThreadFactory("broker-service-url-check"));
+        scheduledCheckTask = executor.scheduleAtFixedRate(() -> {
+            if (closed) {
+                return;
+            }
+            checkPulsarServices();
+            int firstHealthyPulsarService = firstHealthyPulsarService();
+            if (firstHealthyPulsarService == currentPulsarServiceIndex) {
+                return;
+            }
+            if (firstHealthyPulsarService < 0) {
+                int failoverTo = findFailoverTo();
+                if (failoverTo < 0) {
+                    // No healthy pulsar service to connect.
+                    log.error("Failed to choose a pulsar service to connect, 
no one pulsar service is healthy. Current"
+                            + " pulsar service: [{}] {}. States: {}, Counters: 
{}", currentPulsarServiceIndex,
+                            pulsarServiceUrlArray[currentPulsarServiceIndex], 
Arrays.toString(pulsarServiceStateArray),
+                            Arrays.toString(checkCounterArray));
+                } else {
+                    // Failover to low priority pulsar service.
+                    updateServiceUrl(failoverTo);
+                }
+            } else {
+                // Back to high priority pulsar service.
+                updateServiceUrl(firstHealthyPulsarService);
+            }
+        }, checkHealthyIntervalMs, checkHealthyIntervalMs, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public String getServiceUrl() {
+        return pulsarServiceUrlArray[currentPulsarServiceIndex];
+    }
+
+    @Override
+    public void close() throws Exception {
+        log.info("Closing service url provider. Current pulsar service: [{}] 
{}", currentPulsarServiceIndex,
+                pulsarServiceUrlArray[currentPulsarServiceIndex]);
+        closed = true;
+        scheduledCheckTask.cancel(false);
+        executor.shutdownNow();
+    }
+
+    private int firstHealthyPulsarService() {
+        for (int i = 0; i <= currentPulsarServiceIndex; i++) {
+            if (pulsarServiceStateArray[i] == PulsarServiceState.Healthy
+                    || pulsarServiceStateArray[i] == 
PulsarServiceState.PreFail) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private int findFailoverTo() {
+        for (int i = currentPulsarServiceIndex + 1; i <= 
pulsarServiceUrlArray.length; i++) {
+            if (probeAvailable(i)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private void checkPulsarServices() {
+        for (int i = 0; i <= currentPulsarServiceIndex; i++) {
+            if (probeAvailable(i)) {
+                switch (pulsarServiceStateArray[i]) {
+                    case Healthy: {
+                        break;
+                    }
+                    case PreFail: {
+                        pulsarServiceStateArray[i] = 
PulsarServiceState.Healthy;
+                        checkCounterArray[i].setValue(0);
+                        break;
+                    }
+                    case Failed: {
+                        pulsarServiceStateArray[i] = 
PulsarServiceState.PreRecover;
+                        checkCounterArray[i].setValue(1);
+                        break;
+                    }
+                    case PreRecover: {
+                        
checkCounterArray[i].setValue(checkCounterArray[i].getValue() + 1);
+                        if (checkCounterArray[i].getValue() >= 
recoverThreshold) {
+                            pulsarServiceStateArray[i] = 
PulsarServiceState.Healthy;
+                            checkCounterArray[i].setValue(0);
+                        }
+                        break;
+                    }
+                }
+            } else {
+                switch (pulsarServiceStateArray[i]) {
+                    case Healthy: {
+                        pulsarServiceStateArray[i] = 
PulsarServiceState.PreFail;
+                        checkCounterArray[i].setValue(1);
+                        break;
+                    }
+                    case PreFail: {
+                        
checkCounterArray[i].setValue(checkCounterArray[i].getValue() + 1);
+                        if (checkCounterArray[i].getValue() >= 
failoverThreshold) {
+                            pulsarServiceStateArray[i] = 
PulsarServiceState.Failed;
+                            checkCounterArray[i].setValue(0);
+                        }
+                        break;
+                    }
+                    case Failed: {
+                        break;
+                    }
+                    case PreRecover: {
+                        pulsarServiceStateArray[i] = PulsarServiceState.Failed;
+                        checkCounterArray[i].setValue(0);
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean probeAvailable(int brokerServiceIndex) {
+        String url = pulsarServiceUrlArray[brokerServiceIndex];
+        try {
+            LookupTopicResult res = 
pulsarClient.getLookup(url).getBroker(TopicName.get(testTopic))
+                    .get(3, TimeUnit.SECONDS);
+            if (log.isDebugEnabled()) {
+                log.debug("Success to probe available(lookup res: {}), [{}] 
{}}. States: {}, Counters: {}",
+                        res.toString(), brokerServiceIndex, url, 
Arrays.toString(pulsarServiceStateArray),
+                        Arrays.toString(checkCounterArray));
+            }
+            return true;
+        } catch (Exception e) {
+            Throwable actEx = FutureUtil.unwrapCompletionException(e);
+            if (actEx instanceof PulsarAdminException.NotFoundException
+                    || actEx instanceof PulsarClientException.NotFoundException
+                    || actEx instanceof 
PulsarClientException.TopicDoesNotExistException
+                    || actEx instanceof PulsarClientException.LookupException) 
{
+                if (markTopicNotFoundAsAvailable) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Success to probe available(case 
tenant/namespace/topic not found), [{}] {}."
+                                + " States: {}, Counters: {}", 
brokerServiceIndex, url,
+                                Arrays.toString(pulsarServiceStateArray), 
Arrays.toString(checkCounterArray));
+                    }
+                    return true;
+                } else {
+                    log.warn("Failed to probe available(error 
tenant/namespace/topic not found), [{}] {}. States: {},"
+                            + " Counters: {}", brokerServiceIndex, url, 
Arrays.toString(pulsarServiceStateArray),
+                            Arrays.toString(checkCounterArray));
+                    return false;
+                }
+            }
+            log.warn("Failed to probe available, [{}] {}. States: {}, 
Counters: {}", brokerServiceIndex, url,
+                    Arrays.toString(pulsarServiceStateArray), 
Arrays.toString(checkCounterArray));
+            return false;
+        }
+    }
+
+    private void updateServiceUrl(int targetIndex) {
+        String currentUrl = pulsarServiceUrlArray[currentPulsarServiceIndex];
+        String targetUrl = pulsarServiceUrlArray[targetIndex];
+        String logMsg;
+        if (targetIndex < currentPulsarServiceIndex) {
+            logMsg = String.format("Recover to high priority pulsar service 
[%s] %s --> [%s] %s. States: %s,"
+                    + " Counters: %s", currentPulsarServiceIndex, currentUrl, 
targetIndex, targetUrl,
+                    Arrays.toString(pulsarServiceStateArray), 
Arrays.toString(checkCounterArray));
+        } else {
+            logMsg = String.format("Failover to low priority pulsar service 
[%s] %s --> [%s] %s. States: %s,"
+                    + " Counters: %s", currentPulsarServiceIndex, currentUrl, 
targetIndex, targetUrl,
+                    Arrays.toString(pulsarServiceStateArray), 
Arrays.toString(checkCounterArray));
+        }
+        log.info(logMsg);
+        try {
+            pulsarClient.updateServiceUrl(targetUrl);
+            pulsarClient.reloadLookUp();
+            currentPulsarServiceIndex = targetIndex;
+        } catch (Exception e) {
+            log.error("Failed to {}", logMsg, e);
+        }
+    }
+
+    public enum PulsarServiceState {
+        Healthy,
+        PreFail,
+        Failed,
+        PreRecover;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private SameAuthParamsLookupAutoClusterFailover
+                sameAuthParamsLookupAutoClusterFailover = new 
SameAuthParamsLookupAutoClusterFailover();
+
+        public Builder failoverThreshold(int failoverThreshold) {
+            if (failoverThreshold < 1) {
+                throw new IllegalArgumentException("failoverThreshold must be 
larger than 0");
+            }
+            sameAuthParamsLookupAutoClusterFailover.failoverThreshold = 
failoverThreshold;
+            return this;
+        }
+
+        public Builder recoverThreshold(int recoverThreshold) {
+            if (recoverThreshold < 1) {
+                throw new IllegalArgumentException("recoverThreshold must be 
larger than 0");
+            }
+            sameAuthParamsLookupAutoClusterFailover.recoverThreshold = 
recoverThreshold;
+            return this;
+        }
+
+        public Builder checkHealthyIntervalMs(int checkHealthyIntervalMs) {
+            if (checkHealthyIntervalMs < 1) {
+                throw new IllegalArgumentException("checkHealthyIntervalMs 
must be larger than 0");
+            }
+            sameAuthParamsLookupAutoClusterFailover.checkHealthyIntervalMs = 
checkHealthyIntervalMs;
+            return this;
+        }
+
+        public Builder testTopic(String testTopic) {
+            if (StringUtils.isBlank(testTopic) && TopicName.get(testTopic) != 
null) {
+                throw new IllegalArgumentException("testTopic can not be 
blank");
+            }
+            sameAuthParamsLookupAutoClusterFailover.testTopic = testTopic;
+            return this;
+        }
+
+        public Builder markTopicNotFoundAsAvailable(boolean 
markTopicNotFoundAsAvailable) {
+            
sameAuthParamsLookupAutoClusterFailover.markTopicNotFoundAsAvailable = 
markTopicNotFoundAsAvailable;
+            return this;
+        }
+
+        public Builder pulsarServiceUrlArray(String[] pulsarServiceUrlArray) {
+            if (pulsarServiceUrlArray == null || pulsarServiceUrlArray.length 
== 0) {
+                throw new IllegalArgumentException("pulsarServiceUrlArray can 
not be empty");
+            }
+            sameAuthParamsLookupAutoClusterFailover.pulsarServiceUrlArray = 
pulsarServiceUrlArray;
+            int pulsarServiceLen = pulsarServiceUrlArray.length;
+            HashSet<String> uniqueChecker = new HashSet<>();
+            for (int i = 0; i < pulsarServiceLen; i++) {
+                String pulsarService = pulsarServiceUrlArray[i];
+                if (StringUtils.isBlank(pulsarService)) {
+                    throw new IllegalArgumentException("pulsarServiceUrlArray 
contains a blank value at index " + i);
+                }
+                if (pulsarService.startsWith("http") || 
pulsarService.startsWith("HTTP")) {
+                    throw new 
IllegalArgumentException("SameAuthParamsLookupAutoClusterFailover does not 
support HTTP"
+                            + " protocol pulsar service url so far.");
+                }
+                if (!uniqueChecker.add(pulsarService)) {
+                    throw new IllegalArgumentException("pulsarServiceUrlArray 
contains duplicated value "
+                            + pulsarServiceUrlArray[i]);
+                }
+            }
+            return this;
+        }
+
+        public SameAuthParamsLookupAutoClusterFailover build() {
+            String[] pulsarServiceUrlArray = 
sameAuthParamsLookupAutoClusterFailover.pulsarServiceUrlArray;
+            if (pulsarServiceUrlArray == null) {
+                throw new IllegalArgumentException("pulsarServiceUrlArray can 
not be empty");
+            }
+            int pulsarServiceLen = pulsarServiceUrlArray.length;
+            sameAuthParamsLookupAutoClusterFailover.pulsarServiceStateArray = 
new PulsarServiceState[pulsarServiceLen];
+            sameAuthParamsLookupAutoClusterFailover.checkCounterArray = new 
MutableInt[pulsarServiceLen];
+            for (int i = 0; i < pulsarServiceLen; i++) {
+                
sameAuthParamsLookupAutoClusterFailover.pulsarServiceStateArray[i] = 
PulsarServiceState.Healthy;
+                sameAuthParamsLookupAutoClusterFailover.checkCounterArray[i] = 
new MutableInt(0);
+            }
+            return sameAuthParamsLookupAutoClusterFailover;
+        }
+    }
+}
+
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index 545cf7483e4..b275ffb6012 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -32,7 +31,6 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.awaitility.Awaitility;
@@ -43,7 +41,7 @@ import org.testng.annotations.Test;
 @Slf4j
 public class AutoClusterFailoverTest {
     @Test
-    public void testBuildAutoClusterFailoverInstance() throws 
PulsarClientException {
+    public void testBuildAutoClusterFailoverInstance() throws Exception {
         String primary = "pulsar://localhost:6650";
         String secondary = "pulsar://localhost:6651";
         long failoverDelay = 30;
@@ -106,7 +104,7 @@ public class AutoClusterFailoverTest {
     }
 
     @Test
-    public void testInitialize() {
+    public void testInitialize() throws Exception {
         String primary = "pulsar://localhost:6650";
         String secondary = "pulsar://localhost:6651";
         long failoverDelay = 10;
@@ -151,7 +149,7 @@ public class AutoClusterFailoverTest {
     }
 
     @Test
-    public void testAutoClusterFailoverSwitchWithoutAuthentication() {
+    public void testAutoClusterFailoverSwitchWithoutAuthentication() throws 
Exception {
         String primary = "pulsar://localhost:6650";
         String secondary = "pulsar://localhost:6651";
         long failoverDelay = 1;
@@ -187,7 +185,7 @@ public class AutoClusterFailoverTest {
     }
 
     @Test
-    public void testAutoClusterFailoverSwitchWithAuthentication() throws 
IOException {
+    public void testAutoClusterFailoverSwitchWithAuthentication() throws 
Exception {
         String primary = "pulsar+ssl://localhost:6651";
         String secondary = "pulsar+ssl://localhost:6661";
         long failoverDelay = 1;
@@ -251,7 +249,7 @@ public class AutoClusterFailoverTest {
     }
 
     @Test
-    public void testAutoClusterFailoverSwitchTlsTrustStore() throws 
IOException {
+    public void testAutoClusterFailoverSwitchTlsTrustStore() throws Exception {
         String primary = "pulsar+ssl://localhost:6651";
         String secondary = "pulsar+ssl://localhost:6661";
         long failoverDelay = 1;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index 36160d40d54..fa7145794e1 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.client.impl;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +36,7 @@ import static org.mockito.Mockito.when;
 @Test(groups = "broker-impl")
 public class ControlledClusterFailoverTest {
     @Test
-    public void testBuildControlledClusterFailoverInstance() throws 
IOException {
+    public void testBuildControlledClusterFailoverInstance() throws Exception {
         String defaultServiceUrl = "pulsar://localhost:6650";
         String urlProvider = "http://localhost:8080/test";;
         String keyA = "key-a";
@@ -67,7 +66,7 @@ public class ControlledClusterFailoverTest {
     }
 
     @Test
-    public void testControlledClusterFailoverSwitch() throws IOException {
+    public void testControlledClusterFailoverSwitch() throws Exception {
         String defaultServiceUrl = "pulsar+ssl://localhost:6651";
         String backupServiceUrl = "pulsar+ssl://localhost:6661";
         String urlProvider = "http://localhost:8080";;


Reply via email to