This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9fd5b0862002b349d961a649c5c714bf01055a56 Author: fengyubiao <[email protected]> AuthorDate: Tue Aug 13 00:47:04 2024 +0800 [improve] [client]Add new ServiceUrlProvider implementation: SameAuthParamsAutoClusterFailover (#23129) (cherry picked from commit 06a2f5cc63c126a0262bec4602b81c1c716e4e36) --- ...ameAuthParamsLookupAutoClusterFailoverTest.java | 176 +++++++++++ .../broker/auth/MockedPulsarServiceBaseTest.java | 2 +- .../broker/service/NetworkErrorTestBase.java | 2 +- .../broker/service/OneWayReplicatorTestBase.java | 21 +- .../pulsar/client/api/ServiceUrlProvider.java | 2 +- .../SameAuthParamsLookupAutoClusterFailover.java | 341 +++++++++++++++++++++ .../client/impl/AutoClusterFailoverTest.java | 12 +- .../client/impl/ControlledClusterFailoverTest.java | 5 +- 8 files changed, 546 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java new file mode 100644 index 00000000000..b39f8135e0e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient; +import static org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState; +import io.netty.channel.EventLoopGroup; +import java.net.ServerSocket; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.service.NetworkErrorTestBase; +import org.apache.pulsar.broker.service.OneWayReplicatorTestBase; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class SameAuthParamsLookupAutoClusterFailoverTest extends OneWayReplicatorTestBase { + + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterMethod(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + + @DataProvider(name = "enabledTls") + public Object[][] enabledTls () { + return new Object[][] { + {true}, + {false} + }; + } + + @Test(dataProvider = "enabledTls", timeOut = 240 * 1000) + public void testAutoClusterFailover(boolean enabledTls) throws Exception { + // Start clusters. + setup(); + ServerSocket dummyServer = new ServerSocket(NetworkErrorTestBase.getOneFreePort()); + + // Initialize client. + String urlProxy = enabledTls ? "pulsar+tls://127.0.0.1:" + dummyServer.getLocalPort() + : "pulsar://127.0.0.1:" + dummyServer.getLocalPort(); + String url1 = enabledTls ? pulsar1.getBrokerServiceUrlTls() : pulsar1.getBrokerServiceUrl(); + String url2 = enabledTls ? pulsar2.getBrokerServiceUrlTls() : pulsar2.getBrokerServiceUrl(); + final String[] urlArray = new String[]{url1, urlProxy, url2}; + final SameAuthParamsLookupAutoClusterFailover failover = SameAuthParamsLookupAutoClusterFailover.builder() + .pulsarServiceUrlArray(urlArray) + .failoverThreshold(5) + .recoverThreshold(5) + .checkHealthyIntervalMs(300) + .testTopic("a/b/c") + .markTopicNotFoundAsAvailable(true) + .build(); + ClientBuilder clientBuilder = PulsarClient.builder().serviceUrlProvider(failover); + if (enabledTls) { + Map<String, String> authParams = new HashMap<>(); + authParams.put("tlsCertFile", getTlsFileForClient("admin.cert")); + authParams.put("tlsKeyFile", getTlsFileForClient("admin.key-pk8")); + clientBuilder.authentication(AuthenticationTls.class.getName(), authParams) + .enableTls(true) + .allowTlsInsecureConnection(false) + .tlsTrustCertsFilePath(CA_CERT_FILE_PATH); + } + final PulsarClient client = clientBuilder.build(); + failover.initialize(client); + final EventLoopGroup executor = WhiteboxImpl.getInternalState(failover, "executor"); + final PulsarServiceState[] stateArray = + WhiteboxImpl.getInternalState(failover, "pulsarServiceStateArray"); + + // Test all things is fine. + final String tp = BrokerTestUtil.newUniqueName(nonReplicatedNamespace + "/tp"); + final Producer<String> producer = client.newProducer(Schema.STRING).topic(tp).create(); + producer.send("0"); + Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0); + + CompletableFuture<Boolean> checkStatesFuture1 = new CompletableFuture<>(); + executor.submit(() -> { + boolean res = stateArray[0] == PulsarServiceState.Healthy; + res = res & stateArray[1] == PulsarServiceState.Healthy; + res = res & stateArray[2] == PulsarServiceState.Healthy; + checkStatesFuture1.complete(res); + }); + Assert.assertTrue(checkStatesFuture1.join()); + + // Test failover 0 --> 3. + pulsar1.close(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> { + CompletableFuture<Boolean> checkStatesFuture2 = new CompletableFuture<>(); + executor.submit(() -> { + boolean res = stateArray[0] == PulsarServiceState.Failed; + res = res & stateArray[1] == PulsarServiceState.Failed; + res = res & stateArray[2] == PulsarServiceState.Healthy; + checkStatesFuture2.complete(res); + }); + Assert.assertTrue(checkStatesFuture2.join()); + producer.send("0->2"); + Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 2); + }); + + // Test recover 2 --> 1. + executor.execute(() -> { + urlArray[1] = url2; + }); + Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> { + CompletableFuture<Boolean> checkStatesFuture3 = new CompletableFuture<>(); + executor.submit(() -> { + boolean res = stateArray[0] == PulsarServiceState.Failed; + res = res & stateArray[1] == PulsarServiceState.Healthy; + res = res & stateArray[2] == PulsarServiceState.Healthy; + checkStatesFuture3.complete(res); + }); + Assert.assertTrue(checkStatesFuture3.join()); + producer.send("2->1"); + Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 1); + }); + + // Test recover 1 --> 0. + executor.execute(() -> { + urlArray[0] = url2; + }); + Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> { + CompletableFuture<Boolean> checkStatesFuture4 = new CompletableFuture<>(); + executor.submit(() -> { + boolean res = stateArray[0] == PulsarServiceState.Healthy; + res = res & stateArray[1] == PulsarServiceState.Healthy; + res = res & stateArray[2] == PulsarServiceState.Healthy; + checkStatesFuture4.complete(res); + }); + Assert.assertTrue(checkStatesFuture4.join()); + producer.send("1->0"); + Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0); + }); + + // cleanup. + producer.close(); + client.close(); + dummyServer.close(); + } + + @Override + protected void cleanupPulsarResources() { + // Nothing to do. + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index eef4469aa95..e155e399e24 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -84,7 +84,7 @@ import org.testng.annotations.DataProvider; public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport { // All certificate-authority files are copied from the tests/certificate-authority directory and all share the same // root CA. - protected static String getTlsFileForClient(String name) { + public static String getTlsFileForClient(String name) { return ResourceUtils.getAbsolutePath(String.format("certificate-authority/client-keys/%s.pem", name)); } public final static String CA_CERT_FILE_PATH = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java index 36f8cb47612..742194d9b12 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java @@ -102,7 +102,7 @@ public abstract class NetworkErrorTestBase extends TestRetrySupport { log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(), broker2.getListenPort()); } - protected int getOneFreePort() throws IOException { + public static int getOneFreePort() throws IOException { ServerSocket serverSocket = new ServerSocket(0); int port = serverSocket.getLocalPort(); serverSocket.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index d66e666e3a0..f3076ebdec6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_CERT_FILE_PATH; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_KEY_FILE_PATH; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -267,10 +270,18 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); config.setLoadBalancerSheddingEnabled(false); config.setForceDeleteNamespaceAllowed(true); + config.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH); + config.setTlsKeyFilePath(BROKER_KEY_FILE_PATH); + config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH); + config.setClusterName(clusterName); + config.setTlsRequireTrustedClientCertOnConnect(false); + Set<String> tlsProtocols = Sets.newConcurrentHashSet(); + tlsProtocols.add("TLSv1.3"); + tlsProtocols.add("TLSv1.2"); + config.setTlsProtocols(tlsProtocols); } - @Override - protected void cleanup() throws Exception { + protected void cleanupPulsarResources() throws Exception { // delete namespaces. waitChangeEventsInit(replicatedNamespace); admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1)); @@ -283,6 +294,12 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport { admin2.namespaces().deleteNamespace(replicatedNamespace, true); admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true); } + } + + @Override + protected void cleanup() throws Exception { + // cleanup pulsar resources. + cleanupPulsarResources(); // shutdown. markCurrentSetupNumberCleaned(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java index 5cb22276553..e8b513b103f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java @@ -56,7 +56,7 @@ public interface ServiceUrlProvider extends AutoCloseable { * */ @Override - default void close() { + default void close() throws Exception { // do nothing } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java new file mode 100644 index 00000000000..4beff4719c8 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.ScheduledFuture; +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.ServiceUrlProvider; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.netty.EventLoopUtil; + +@Slf4j +@SuppressFBWarnings(value = {"EI_EXPOSE_REP2"}) +public class SameAuthParamsLookupAutoClusterFailover implements ServiceUrlProvider { + + private PulsarClientImpl pulsarClient; + private EventLoopGroup executor; + private volatile boolean closed; + private ScheduledFuture<?> scheduledCheckTask; + @Getter + private int failoverThreshold = 5; + @Getter + private int recoverThreshold = 5; + @Getter + private long checkHealthyIntervalMs = 1000; + @Getter + private boolean markTopicNotFoundAsAvailable = true; + @Getter + private String testTopic = "public/default/tp_test"; + + private String[] pulsarServiceUrlArray; + private PulsarServiceState[] pulsarServiceStateArray; + private MutableInt[] checkCounterArray; + @Getter + private volatile int currentPulsarServiceIndex; + + private SameAuthParamsLookupAutoClusterFailover() {} + + @Override + public void initialize(PulsarClient client) { + this.currentPulsarServiceIndex = 0; + this.pulsarClient = (PulsarClientImpl) client; + this.executor = EventLoopUtil.newEventLoopGroup(1, false, + new ExecutorProvider.ExtendedThreadFactory("broker-service-url-check")); + scheduledCheckTask = executor.scheduleAtFixedRate(() -> { + if (closed) { + return; + } + checkPulsarServices(); + int firstHealthyPulsarService = firstHealthyPulsarService(); + if (firstHealthyPulsarService == currentPulsarServiceIndex) { + return; + } + if (firstHealthyPulsarService < 0) { + int failoverTo = findFailoverTo(); + if (failoverTo < 0) { + // No healthy pulsar service to connect. + log.error("Failed to choose a pulsar service to connect, no one pulsar service is healthy. Current" + + " pulsar service: [{}] {}. States: {}, Counters: {}", currentPulsarServiceIndex, + pulsarServiceUrlArray[currentPulsarServiceIndex], Arrays.toString(pulsarServiceStateArray), + Arrays.toString(checkCounterArray)); + } else { + // Failover to low priority pulsar service. + updateServiceUrl(failoverTo); + } + } else { + // Back to high priority pulsar service. + updateServiceUrl(firstHealthyPulsarService); + } + }, checkHealthyIntervalMs, checkHealthyIntervalMs, TimeUnit.MILLISECONDS); + } + + @Override + public String getServiceUrl() { + return pulsarServiceUrlArray[currentPulsarServiceIndex]; + } + + @Override + public void close() throws Exception { + log.info("Closing service url provider. Current pulsar service: [{}] {}", currentPulsarServiceIndex, + pulsarServiceUrlArray[currentPulsarServiceIndex]); + closed = true; + scheduledCheckTask.cancel(false); + executor.shutdownNow(); + } + + private int firstHealthyPulsarService() { + for (int i = 0; i <= currentPulsarServiceIndex; i++) { + if (pulsarServiceStateArray[i] == PulsarServiceState.Healthy + || pulsarServiceStateArray[i] == PulsarServiceState.PreFail) { + return i; + } + } + return -1; + } + + private int findFailoverTo() { + for (int i = currentPulsarServiceIndex + 1; i <= pulsarServiceUrlArray.length; i++) { + if (probeAvailable(i)) { + return i; + } + } + return -1; + } + + private void checkPulsarServices() { + for (int i = 0; i <= currentPulsarServiceIndex; i++) { + if (probeAvailable(i)) { + switch (pulsarServiceStateArray[i]) { + case Healthy: { + break; + } + case PreFail: { + pulsarServiceStateArray[i] = PulsarServiceState.Healthy; + checkCounterArray[i].setValue(0); + break; + } + case Failed: { + pulsarServiceStateArray[i] = PulsarServiceState.PreRecover; + checkCounterArray[i].setValue(1); + break; + } + case PreRecover: { + checkCounterArray[i].setValue(checkCounterArray[i].getValue() + 1); + if (checkCounterArray[i].getValue() >= recoverThreshold) { + pulsarServiceStateArray[i] = PulsarServiceState.Healthy; + checkCounterArray[i].setValue(0); + } + break; + } + } + } else { + switch (pulsarServiceStateArray[i]) { + case Healthy: { + pulsarServiceStateArray[i] = PulsarServiceState.PreFail; + checkCounterArray[i].setValue(1); + break; + } + case PreFail: { + checkCounterArray[i].setValue(checkCounterArray[i].getValue() + 1); + if (checkCounterArray[i].getValue() >= failoverThreshold) { + pulsarServiceStateArray[i] = PulsarServiceState.Failed; + checkCounterArray[i].setValue(0); + } + break; + } + case Failed: { + break; + } + case PreRecover: { + pulsarServiceStateArray[i] = PulsarServiceState.Failed; + checkCounterArray[i].setValue(0); + break; + } + } + } + } + } + + private boolean probeAvailable(int brokerServiceIndex) { + String url = pulsarServiceUrlArray[brokerServiceIndex]; + try { + LookupTopicResult res = pulsarClient.getLookup(url).getBroker(TopicName.get(testTopic)) + .get(3, TimeUnit.SECONDS); + if (log.isDebugEnabled()) { + log.debug("Success to probe available(lookup res: {}), [{}] {}}. States: {}, Counters: {}", + res.toString(), brokerServiceIndex, url, Arrays.toString(pulsarServiceStateArray), + Arrays.toString(checkCounterArray)); + } + return true; + } catch (Exception e) { + Throwable actEx = FutureUtil.unwrapCompletionException(e); + if (actEx instanceof PulsarAdminException.NotFoundException + || actEx instanceof PulsarClientException.NotFoundException + || actEx instanceof PulsarClientException.TopicDoesNotExistException + || actEx instanceof PulsarClientException.LookupException) { + if (markTopicNotFoundAsAvailable) { + if (log.isDebugEnabled()) { + log.debug("Success to probe available(case tenant/namespace/topic not found), [{}] {}." + + " States: {}, Counters: {}", brokerServiceIndex, url, + Arrays.toString(pulsarServiceStateArray), Arrays.toString(checkCounterArray)); + } + return true; + } else { + log.warn("Failed to probe available(error tenant/namespace/topic not found), [{}] {}. States: {}," + + " Counters: {}", brokerServiceIndex, url, Arrays.toString(pulsarServiceStateArray), + Arrays.toString(checkCounterArray)); + return false; + } + } + log.warn("Failed to probe available, [{}] {}. States: {}, Counters: {}", brokerServiceIndex, url, + Arrays.toString(pulsarServiceStateArray), Arrays.toString(checkCounterArray)); + return false; + } + } + + private void updateServiceUrl(int targetIndex) { + String currentUrl = pulsarServiceUrlArray[currentPulsarServiceIndex]; + String targetUrl = pulsarServiceUrlArray[targetIndex]; + String logMsg; + if (targetIndex < currentPulsarServiceIndex) { + logMsg = String.format("Recover to high priority pulsar service [%s] %s --> [%s] %s. States: %s," + + " Counters: %s", currentPulsarServiceIndex, currentUrl, targetIndex, targetUrl, + Arrays.toString(pulsarServiceStateArray), Arrays.toString(checkCounterArray)); + } else { + logMsg = String.format("Failover to low priority pulsar service [%s] %s --> [%s] %s. States: %s," + + " Counters: %s", currentPulsarServiceIndex, currentUrl, targetIndex, targetUrl, + Arrays.toString(pulsarServiceStateArray), Arrays.toString(checkCounterArray)); + } + log.info(logMsg); + try { + pulsarClient.updateServiceUrl(targetUrl); + pulsarClient.reloadLookUp(); + currentPulsarServiceIndex = targetIndex; + } catch (Exception e) { + log.error("Failed to {}", logMsg, e); + } + } + + public enum PulsarServiceState { + Healthy, + PreFail, + Failed, + PreRecover; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private SameAuthParamsLookupAutoClusterFailover + sameAuthParamsLookupAutoClusterFailover = new SameAuthParamsLookupAutoClusterFailover(); + + public Builder failoverThreshold(int failoverThreshold) { + if (failoverThreshold < 1) { + throw new IllegalArgumentException("failoverThreshold must be larger than 0"); + } + sameAuthParamsLookupAutoClusterFailover.failoverThreshold = failoverThreshold; + return this; + } + + public Builder recoverThreshold(int recoverThreshold) { + if (recoverThreshold < 1) { + throw new IllegalArgumentException("recoverThreshold must be larger than 0"); + } + sameAuthParamsLookupAutoClusterFailover.recoverThreshold = recoverThreshold; + return this; + } + + public Builder checkHealthyIntervalMs(int checkHealthyIntervalMs) { + if (checkHealthyIntervalMs < 1) { + throw new IllegalArgumentException("checkHealthyIntervalMs must be larger than 0"); + } + sameAuthParamsLookupAutoClusterFailover.checkHealthyIntervalMs = checkHealthyIntervalMs; + return this; + } + + public Builder testTopic(String testTopic) { + if (StringUtils.isBlank(testTopic) && TopicName.get(testTopic) != null) { + throw new IllegalArgumentException("testTopic can not be blank"); + } + sameAuthParamsLookupAutoClusterFailover.testTopic = testTopic; + return this; + } + + public Builder markTopicNotFoundAsAvailable(boolean markTopicNotFoundAsAvailable) { + sameAuthParamsLookupAutoClusterFailover.markTopicNotFoundAsAvailable = markTopicNotFoundAsAvailable; + return this; + } + + public Builder pulsarServiceUrlArray(String[] pulsarServiceUrlArray) { + if (pulsarServiceUrlArray == null || pulsarServiceUrlArray.length == 0) { + throw new IllegalArgumentException("pulsarServiceUrlArray can not be empty"); + } + sameAuthParamsLookupAutoClusterFailover.pulsarServiceUrlArray = pulsarServiceUrlArray; + int pulsarServiceLen = pulsarServiceUrlArray.length; + HashSet<String> uniqueChecker = new HashSet<>(); + for (int i = 0; i < pulsarServiceLen; i++) { + String pulsarService = pulsarServiceUrlArray[i]; + if (StringUtils.isBlank(pulsarService)) { + throw new IllegalArgumentException("pulsarServiceUrlArray contains a blank value at index " + i); + } + if (pulsarService.startsWith("http") || pulsarService.startsWith("HTTP")) { + throw new IllegalArgumentException("SameAuthParamsLookupAutoClusterFailover does not support HTTP" + + " protocol pulsar service url so far."); + } + if (!uniqueChecker.add(pulsarService)) { + throw new IllegalArgumentException("pulsarServiceUrlArray contains duplicated value " + + pulsarServiceUrlArray[i]); + } + } + return this; + } + + public SameAuthParamsLookupAutoClusterFailover build() { + String[] pulsarServiceUrlArray = sameAuthParamsLookupAutoClusterFailover.pulsarServiceUrlArray; + if (pulsarServiceUrlArray == null) { + throw new IllegalArgumentException("pulsarServiceUrlArray can not be empty"); + } + int pulsarServiceLen = pulsarServiceUrlArray.length; + sameAuthParamsLookupAutoClusterFailover.pulsarServiceStateArray = new PulsarServiceState[pulsarServiceLen]; + sameAuthParamsLookupAutoClusterFailover.checkCounterArray = new MutableInt[pulsarServiceLen]; + for (int i = 0; i < pulsarServiceLen; i++) { + sameAuthParamsLookupAutoClusterFailover.pulsarServiceStateArray[i] = PulsarServiceState.Healthy; + sameAuthParamsLookupAutoClusterFailover.checkCounterArray[i] = new MutableInt(0); + } + return sameAuthParamsLookupAutoClusterFailover; + } + } +} + diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java index 545cf7483e4..b275ffb6012 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -32,7 +31,6 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.awaitility.Awaitility; @@ -43,7 +41,7 @@ import org.testng.annotations.Test; @Slf4j public class AutoClusterFailoverTest { @Test - public void testBuildAutoClusterFailoverInstance() throws PulsarClientException { + public void testBuildAutoClusterFailoverInstance() throws Exception { String primary = "pulsar://localhost:6650"; String secondary = "pulsar://localhost:6651"; long failoverDelay = 30; @@ -106,7 +104,7 @@ public class AutoClusterFailoverTest { } @Test - public void testInitialize() { + public void testInitialize() throws Exception { String primary = "pulsar://localhost:6650"; String secondary = "pulsar://localhost:6651"; long failoverDelay = 10; @@ -151,7 +149,7 @@ public class AutoClusterFailoverTest { } @Test - public void testAutoClusterFailoverSwitchWithoutAuthentication() { + public void testAutoClusterFailoverSwitchWithoutAuthentication() throws Exception { String primary = "pulsar://localhost:6650"; String secondary = "pulsar://localhost:6651"; long failoverDelay = 1; @@ -187,7 +185,7 @@ public class AutoClusterFailoverTest { } @Test - public void testAutoClusterFailoverSwitchWithAuthentication() throws IOException { + public void testAutoClusterFailoverSwitchWithAuthentication() throws Exception { String primary = "pulsar+ssl://localhost:6651"; String secondary = "pulsar+ssl://localhost:6661"; long failoverDelay = 1; @@ -251,7 +249,7 @@ public class AutoClusterFailoverTest { } @Test - public void testAutoClusterFailoverSwitchTlsTrustStore() throws IOException { + public void testAutoClusterFailoverSwitchTlsTrustStore() throws Exception { String primary = "pulsar+ssl://localhost:6651"; String secondary = "pulsar+ssl://localhost:6661"; long failoverDelay = 1; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java index 36160d40d54..fa7145794e1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -37,7 +36,7 @@ import static org.mockito.Mockito.when; @Test(groups = "broker-impl") public class ControlledClusterFailoverTest { @Test - public void testBuildControlledClusterFailoverInstance() throws IOException { + public void testBuildControlledClusterFailoverInstance() throws Exception { String defaultServiceUrl = "pulsar://localhost:6650"; String urlProvider = "http://localhost:8080/test"; String keyA = "key-a"; @@ -67,7 +66,7 @@ public class ControlledClusterFailoverTest { } @Test - public void testControlledClusterFailoverSwitch() throws IOException { + public void testControlledClusterFailoverSwitch() throws Exception { String defaultServiceUrl = "pulsar+ssl://localhost:6651"; String backupServiceUrl = "pulsar+ssl://localhost:6661"; String urlProvider = "http://localhost:8080";
