This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 06a2f5cc63c [improve] [client]Add new ServiceUrlProvider
implementation: SameAuthParamsAutoClusterFailover (#23129)
06a2f5cc63c is described below
commit 06a2f5cc63c126a0262bec4602b81c1c716e4e36
Author: fengyubiao <[email protected]>
AuthorDate: Tue Aug 13 00:47:04 2024 +0800
[improve] [client]Add new ServiceUrlProvider implementation:
SameAuthParamsAutoClusterFailover (#23129)
---
...ameAuthParamsLookupAutoClusterFailoverTest.java | 176 +++++++++++
.../broker/auth/MockedPulsarServiceBaseTest.java | 2 +-
.../broker/service/NetworkErrorTestBase.java | 2 +-
.../broker/service/OneWayReplicatorTestBase.java | 21 +-
.../client/api/NonDurableSubscriptionTest.java | 33 +-
.../pulsar/client/api/ServiceUrlProvider.java | 2 +-
.../SameAuthParamsLookupAutoClusterFailover.java | 341 +++++++++++++++++++++
.../client/impl/AutoClusterFailoverTest.java | 12 +-
.../client/impl/ControlledClusterFailoverTest.java | 5 +-
9 files changed, 572 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
new file mode 100644
index 00000000000..b39f8135e0e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SameAuthParamsLookupAutoClusterFailoverTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.getTlsFileForClient;
+import static
org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover.PulsarServiceState;
+import io.netty.channel.EventLoopGroup;
+import java.net.ServerSocket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.NetworkErrorTestBase;
+import org.apache.pulsar.broker.service.OneWayReplicatorTestBase;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.SameAuthParamsLookupAutoClusterFailover;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class SameAuthParamsLookupAutoClusterFailoverTest extends
OneWayReplicatorTestBase {
+
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @DataProvider(name = "enabledTls")
+ public Object[][] enabledTls () {
+ return new Object[][] {
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "enabledTls", timeOut = 240 * 1000)
+ public void testAutoClusterFailover(boolean enabledTls) throws Exception {
+ // Start clusters.
+ setup();
+ ServerSocket dummyServer = new
ServerSocket(NetworkErrorTestBase.getOneFreePort());
+
+ // Initialize client.
+ String urlProxy = enabledTls ? "pulsar+tls://127.0.0.1:" +
dummyServer.getLocalPort()
+ : "pulsar://127.0.0.1:" + dummyServer.getLocalPort();
+ String url1 = enabledTls ? pulsar1.getBrokerServiceUrlTls() :
pulsar1.getBrokerServiceUrl();
+ String url2 = enabledTls ? pulsar2.getBrokerServiceUrlTls() :
pulsar2.getBrokerServiceUrl();
+ final String[] urlArray = new String[]{url1, urlProxy, url2};
+ final SameAuthParamsLookupAutoClusterFailover failover =
SameAuthParamsLookupAutoClusterFailover.builder()
+ .pulsarServiceUrlArray(urlArray)
+ .failoverThreshold(5)
+ .recoverThreshold(5)
+ .checkHealthyIntervalMs(300)
+ .testTopic("a/b/c")
+ .markTopicNotFoundAsAvailable(true)
+ .build();
+ ClientBuilder clientBuilder =
PulsarClient.builder().serviceUrlProvider(failover);
+ if (enabledTls) {
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", getTlsFileForClient("admin.cert"));
+ authParams.put("tlsKeyFile", getTlsFileForClient("admin.key-pk8"));
+ clientBuilder.authentication(AuthenticationTls.class.getName(),
authParams)
+ .enableTls(true)
+ .allowTlsInsecureConnection(false)
+ .tlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+ }
+ final PulsarClient client = clientBuilder.build();
+ failover.initialize(client);
+ final EventLoopGroup executor =
WhiteboxImpl.getInternalState(failover, "executor");
+ final PulsarServiceState[] stateArray =
+ WhiteboxImpl.getInternalState(failover,
"pulsarServiceStateArray");
+
+ // Test all things is fine.
+ final String tp = BrokerTestUtil.newUniqueName(nonReplicatedNamespace
+ "/tp");
+ final Producer<String> producer =
client.newProducer(Schema.STRING).topic(tp).create();
+ producer.send("0");
+ Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
+
+ CompletableFuture<Boolean> checkStatesFuture1 = new
CompletableFuture<>();
+ executor.submit(() -> {
+ boolean res = stateArray[0] == PulsarServiceState.Healthy;
+ res = res & stateArray[1] == PulsarServiceState.Healthy;
+ res = res & stateArray[2] == PulsarServiceState.Healthy;
+ checkStatesFuture1.complete(res);
+ });
+ Assert.assertTrue(checkStatesFuture1.join());
+
+ // Test failover 0 --> 3.
+ pulsar1.close();
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+ CompletableFuture<Boolean> checkStatesFuture2 = new
CompletableFuture<>();
+ executor.submit(() -> {
+ boolean res = stateArray[0] == PulsarServiceState.Failed;
+ res = res & stateArray[1] == PulsarServiceState.Failed;
+ res = res & stateArray[2] == PulsarServiceState.Healthy;
+ checkStatesFuture2.complete(res);
+ });
+ Assert.assertTrue(checkStatesFuture2.join());
+ producer.send("0->2");
+ Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 2);
+ });
+
+ // Test recover 2 --> 1.
+ executor.execute(() -> {
+ urlArray[1] = url2;
+ });
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+ CompletableFuture<Boolean> checkStatesFuture3 = new
CompletableFuture<>();
+ executor.submit(() -> {
+ boolean res = stateArray[0] == PulsarServiceState.Failed;
+ res = res & stateArray[1] == PulsarServiceState.Healthy;
+ res = res & stateArray[2] == PulsarServiceState.Healthy;
+ checkStatesFuture3.complete(res);
+ });
+ Assert.assertTrue(checkStatesFuture3.join());
+ producer.send("2->1");
+ Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 1);
+ });
+
+ // Test recover 1 --> 0.
+ executor.execute(() -> {
+ urlArray[0] = url2;
+ });
+ Awaitility.await().atMost(60, TimeUnit.SECONDS).untilAsserted(() -> {
+ CompletableFuture<Boolean> checkStatesFuture4 = new
CompletableFuture<>();
+ executor.submit(() -> {
+ boolean res = stateArray[0] == PulsarServiceState.Healthy;
+ res = res & stateArray[1] == PulsarServiceState.Healthy;
+ res = res & stateArray[2] == PulsarServiceState.Healthy;
+ checkStatesFuture4.complete(res);
+ });
+ Assert.assertTrue(checkStatesFuture4.join());
+ producer.send("1->0");
+ Assert.assertEquals(failover.getCurrentPulsarServiceIndex(), 0);
+ });
+
+ // cleanup.
+ producer.close();
+ client.close();
+ dummyServer.close();
+ }
+
+ @Override
+ protected void cleanupPulsarResources() {
+ // Nothing to do.
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index eef4469aa95..e155e399e24 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -84,7 +84,7 @@ import org.testng.annotations.DataProvider;
public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
// All certificate-authority files are copied from the
tests/certificate-authority directory and all share the same
// root CA.
- protected static String getTlsFileForClient(String name) {
+ public static String getTlsFileForClient(String name) {
return
ResourceUtils.getAbsolutePath(String.format("certificate-authority/client-keys/%s.pem",
name));
}
public final static String CA_CERT_FILE_PATH =
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
index 36f8cb47612..742194d9b12 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java
@@ -102,7 +102,7 @@ public abstract class NetworkErrorTestBase extends
TestRetrySupport {
log.info("broker-1: {}, broker-2: {}", broker1.getListenPort(),
broker2.getListenPort());
}
- protected int getOneFreePort() throws IOException {
+ public static int getOneFreePort() throws IOException {
ServerSocket serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
serverSocket.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index d66e666e3a0..f3076ebdec6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.service;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_CERT_FILE_PATH;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.BROKER_KEY_FILE_PATH;
+import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.CA_CERT_FILE_PATH;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -267,10 +270,18 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
config.setLoadBalancerSheddingEnabled(false);
config.setForceDeleteNamespaceAllowed(true);
+ config.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
+ config.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
+ config.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH);
+ config.setClusterName(clusterName);
+ config.setTlsRequireTrustedClientCertOnConnect(false);
+ Set<String> tlsProtocols = Sets.newConcurrentHashSet();
+ tlsProtocols.add("TLSv1.3");
+ tlsProtocols.add("TLSv1.2");
+ config.setTlsProtocols(tlsProtocols);
}
- @Override
- protected void cleanup() throws Exception {
+ protected void cleanupPulsarResources() throws Exception {
// delete namespaces.
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace,
Sets.newHashSet(cluster1));
@@ -283,6 +294,12 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
admin2.namespaces().deleteNamespace(replicatedNamespace, true);
admin2.namespaces().deleteNamespace(nonReplicatedNamespace, true);
}
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ // cleanup pulsar resources.
+ cleanupPulsarResources();
// shutdown.
markCurrentSetupNumberCleaned();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
index bbac688d922..80adc79e6fe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonDurableSubscriptionTest.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.bookkeeper.mledger.impl.ImmutablePositionImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -46,10 +47,12 @@ import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
+import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -542,18 +545,34 @@ public class NonDurableSubscriptionTest extends
ProducerConsumerBase {
.getStats(topicName, true, true,
true).getSubscriptions().get("s1");
log.info("backlog size: {}", subscriptionStats.getMsgBacklog());
assertEquals(subscriptionStats.getMsgBacklog(), 0);
- ManagedLedgerInternalStats.CursorStats cursorStats =
-
admin.topics().getInternalStats(topicName).cursors.get("s1");
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName);
+ ManagedLedgerInternalStats.CursorStats cursorStats =
internalStats.cursors.get("s1");
String[] ledgerIdAndEntryId =
cursorStats.markDeletePosition.split(":");
- Position actMarkDeletedPos =
-
PositionFactory.create(Long.valueOf(ledgerIdAndEntryId[0]),
Long.valueOf(ledgerIdAndEntryId[1]));
- Position expectedMarkDeletedPos =
-
PositionFactory.create(msgIdInDeletedLedger5.getLedgerId(),
msgIdInDeletedLedger5.getEntryId());
+ ImmutablePositionImpl actMarkDeletedPos =
+ new
ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId[0]),
Long.valueOf(ledgerIdAndEntryId[1]));
+ ImmutablePositionImpl expectedMarkDeletedPos =
+ new
ImmutablePositionImpl(msgIdInDeletedLedger5.getLedgerId(),
msgIdInDeletedLedger5.getEntryId());
+ log.info("LAC: {}", internalStats.lastConfirmedEntry);
log.info("Expected mark deleted position: {}",
expectedMarkDeletedPos);
log.info("Actual mark deleted position: {}",
cursorStats.markDeletePosition);
- assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >=
0);
+
AssertJUnit.assertTrue(actMarkDeletedPos.compareTo(expectedMarkDeletedPos) >=
0);
});
+ admin.topics().createSubscription(topicName, "s2", MessageId.earliest);
+ admin.topics().createSubscription(topicName, "s3", MessageId.latest);
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName);
+ ManagedLedgerInternalStats.CursorStats cursorStats2 =
internalStats.cursors.get("s2");
+ String[] ledgerIdAndEntryId2 =
cursorStats2.markDeletePosition.split(":");
+ ImmutablePositionImpl actMarkDeletedPos2 =
+ new
ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId2[0]),
Long.valueOf(ledgerIdAndEntryId2[1]));
+ ManagedLedgerInternalStats.CursorStats cursorStats3 =
internalStats.cursors.get("s3");
+ String[] ledgerIdAndEntryId3 =
cursorStats3.markDeletePosition.split(":");
+ ImmutablePositionImpl actMarkDeletedPos3 =
+ new
ImmutablePositionImpl(Long.valueOf(ledgerIdAndEntryId3[0]),
Long.valueOf(ledgerIdAndEntryId3[1]));
+ log.info("LAC: {}", internalStats.lastConfirmedEntry);
+ log.info("Actual mark deleted position 2: {}", actMarkDeletedPos2);
+ log.info("Actual mark deleted position 3: {}", actMarkDeletedPos3);
+ pulsar.getBrokerService().getTopic(topicName, false).join().get();
// cleanup.
reader.close();
producer.close();
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
index 5cb22276553..e8b513b103f 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
@@ -56,7 +56,7 @@ public interface ServiceUrlProvider extends AutoCloseable {
*
*/
@Override
- default void close() {
+ default void close() throws Exception {
// do nothing
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
new file mode 100644
index 00000000000..4beff4719c8
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SameAuthParamsLookupAutoClusterFailover.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.ScheduledFuture;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+
+@Slf4j
+@SuppressFBWarnings(value = {"EI_EXPOSE_REP2"})
+public class SameAuthParamsLookupAutoClusterFailover implements
ServiceUrlProvider {
+
+ private PulsarClientImpl pulsarClient;
+ private EventLoopGroup executor;
+ private volatile boolean closed;
+ private ScheduledFuture<?> scheduledCheckTask;
+ @Getter
+ private int failoverThreshold = 5;
+ @Getter
+ private int recoverThreshold = 5;
+ @Getter
+ private long checkHealthyIntervalMs = 1000;
+ @Getter
+ private boolean markTopicNotFoundAsAvailable = true;
+ @Getter
+ private String testTopic = "public/default/tp_test";
+
+ private String[] pulsarServiceUrlArray;
+ private PulsarServiceState[] pulsarServiceStateArray;
+ private MutableInt[] checkCounterArray;
+ @Getter
+ private volatile int currentPulsarServiceIndex;
+
+ private SameAuthParamsLookupAutoClusterFailover() {}
+
+ @Override
+ public void initialize(PulsarClient client) {
+ this.currentPulsarServiceIndex = 0;
+ this.pulsarClient = (PulsarClientImpl) client;
+ this.executor = EventLoopUtil.newEventLoopGroup(1, false,
+ new
ExecutorProvider.ExtendedThreadFactory("broker-service-url-check"));
+ scheduledCheckTask = executor.scheduleAtFixedRate(() -> {
+ if (closed) {
+ return;
+ }
+ checkPulsarServices();
+ int firstHealthyPulsarService = firstHealthyPulsarService();
+ if (firstHealthyPulsarService == currentPulsarServiceIndex) {
+ return;
+ }
+ if (firstHealthyPulsarService < 0) {
+ int failoverTo = findFailoverTo();
+ if (failoverTo < 0) {
+ // No healthy pulsar service to connect.
+ log.error("Failed to choose a pulsar service to connect,
no one pulsar service is healthy. Current"
+ + " pulsar service: [{}] {}. States: {}, Counters:
{}", currentPulsarServiceIndex,
+ pulsarServiceUrlArray[currentPulsarServiceIndex],
Arrays.toString(pulsarServiceStateArray),
+ Arrays.toString(checkCounterArray));
+ } else {
+ // Failover to low priority pulsar service.
+ updateServiceUrl(failoverTo);
+ }
+ } else {
+ // Back to high priority pulsar service.
+ updateServiceUrl(firstHealthyPulsarService);
+ }
+ }, checkHealthyIntervalMs, checkHealthyIntervalMs,
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public String getServiceUrl() {
+ return pulsarServiceUrlArray[currentPulsarServiceIndex];
+ }
+
+ @Override
+ public void close() throws Exception {
+ log.info("Closing service url provider. Current pulsar service: [{}]
{}", currentPulsarServiceIndex,
+ pulsarServiceUrlArray[currentPulsarServiceIndex]);
+ closed = true;
+ scheduledCheckTask.cancel(false);
+ executor.shutdownNow();
+ }
+
+ private int firstHealthyPulsarService() {
+ for (int i = 0; i <= currentPulsarServiceIndex; i++) {
+ if (pulsarServiceStateArray[i] == PulsarServiceState.Healthy
+ || pulsarServiceStateArray[i] ==
PulsarServiceState.PreFail) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private int findFailoverTo() {
+ for (int i = currentPulsarServiceIndex + 1; i <=
pulsarServiceUrlArray.length; i++) {
+ if (probeAvailable(i)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private void checkPulsarServices() {
+ for (int i = 0; i <= currentPulsarServiceIndex; i++) {
+ if (probeAvailable(i)) {
+ switch (pulsarServiceStateArray[i]) {
+ case Healthy: {
+ break;
+ }
+ case PreFail: {
+ pulsarServiceStateArray[i] =
PulsarServiceState.Healthy;
+ checkCounterArray[i].setValue(0);
+ break;
+ }
+ case Failed: {
+ pulsarServiceStateArray[i] =
PulsarServiceState.PreRecover;
+ checkCounterArray[i].setValue(1);
+ break;
+ }
+ case PreRecover: {
+
checkCounterArray[i].setValue(checkCounterArray[i].getValue() + 1);
+ if (checkCounterArray[i].getValue() >=
recoverThreshold) {
+ pulsarServiceStateArray[i] =
PulsarServiceState.Healthy;
+ checkCounterArray[i].setValue(0);
+ }
+ break;
+ }
+ }
+ } else {
+ switch (pulsarServiceStateArray[i]) {
+ case Healthy: {
+ pulsarServiceStateArray[i] =
PulsarServiceState.PreFail;
+ checkCounterArray[i].setValue(1);
+ break;
+ }
+ case PreFail: {
+
checkCounterArray[i].setValue(checkCounterArray[i].getValue() + 1);
+ if (checkCounterArray[i].getValue() >=
failoverThreshold) {
+ pulsarServiceStateArray[i] =
PulsarServiceState.Failed;
+ checkCounterArray[i].setValue(0);
+ }
+ break;
+ }
+ case Failed: {
+ break;
+ }
+ case PreRecover: {
+ pulsarServiceStateArray[i] = PulsarServiceState.Failed;
+ checkCounterArray[i].setValue(0);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private boolean probeAvailable(int brokerServiceIndex) {
+ String url = pulsarServiceUrlArray[brokerServiceIndex];
+ try {
+ LookupTopicResult res =
pulsarClient.getLookup(url).getBroker(TopicName.get(testTopic))
+ .get(3, TimeUnit.SECONDS);
+ if (log.isDebugEnabled()) {
+ log.debug("Success to probe available(lookup res: {}), [{}]
{}}. States: {}, Counters: {}",
+ res.toString(), brokerServiceIndex, url,
Arrays.toString(pulsarServiceStateArray),
+ Arrays.toString(checkCounterArray));
+ }
+ return true;
+ } catch (Exception e) {
+ Throwable actEx = FutureUtil.unwrapCompletionException(e);
+ if (actEx instanceof PulsarAdminException.NotFoundException
+ || actEx instanceof PulsarClientException.NotFoundException
+ || actEx instanceof
PulsarClientException.TopicDoesNotExistException
+ || actEx instanceof PulsarClientException.LookupException)
{
+ if (markTopicNotFoundAsAvailable) {
+ if (log.isDebugEnabled()) {
+ log.debug("Success to probe available(case
tenant/namespace/topic not found), [{}] {}."
+ + " States: {}, Counters: {}",
brokerServiceIndex, url,
+ Arrays.toString(pulsarServiceStateArray),
Arrays.toString(checkCounterArray));
+ }
+ return true;
+ } else {
+ log.warn("Failed to probe available(error
tenant/namespace/topic not found), [{}] {}. States: {},"
+ + " Counters: {}", brokerServiceIndex, url,
Arrays.toString(pulsarServiceStateArray),
+ Arrays.toString(checkCounterArray));
+ return false;
+ }
+ }
+ log.warn("Failed to probe available, [{}] {}. States: {},
Counters: {}", brokerServiceIndex, url,
+ Arrays.toString(pulsarServiceStateArray),
Arrays.toString(checkCounterArray));
+ return false;
+ }
+ }
+
+ private void updateServiceUrl(int targetIndex) {
+ String currentUrl = pulsarServiceUrlArray[currentPulsarServiceIndex];
+ String targetUrl = pulsarServiceUrlArray[targetIndex];
+ String logMsg;
+ if (targetIndex < currentPulsarServiceIndex) {
+ logMsg = String.format("Recover to high priority pulsar service
[%s] %s --> [%s] %s. States: %s,"
+ + " Counters: %s", currentPulsarServiceIndex, currentUrl,
targetIndex, targetUrl,
+ Arrays.toString(pulsarServiceStateArray),
Arrays.toString(checkCounterArray));
+ } else {
+ logMsg = String.format("Failover to low priority pulsar service
[%s] %s --> [%s] %s. States: %s,"
+ + " Counters: %s", currentPulsarServiceIndex, currentUrl,
targetIndex, targetUrl,
+ Arrays.toString(pulsarServiceStateArray),
Arrays.toString(checkCounterArray));
+ }
+ log.info(logMsg);
+ try {
+ pulsarClient.updateServiceUrl(targetUrl);
+ pulsarClient.reloadLookUp();
+ currentPulsarServiceIndex = targetIndex;
+ } catch (Exception e) {
+ log.error("Failed to {}", logMsg, e);
+ }
+ }
+
+ public enum PulsarServiceState {
+ Healthy,
+ PreFail,
+ Failed,
+ PreRecover;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private SameAuthParamsLookupAutoClusterFailover
+ sameAuthParamsLookupAutoClusterFailover = new
SameAuthParamsLookupAutoClusterFailover();
+
+ public Builder failoverThreshold(int failoverThreshold) {
+ if (failoverThreshold < 1) {
+ throw new IllegalArgumentException("failoverThreshold must be
larger than 0");
+ }
+ sameAuthParamsLookupAutoClusterFailover.failoverThreshold =
failoverThreshold;
+ return this;
+ }
+
+ public Builder recoverThreshold(int recoverThreshold) {
+ if (recoverThreshold < 1) {
+ throw new IllegalArgumentException("recoverThreshold must be
larger than 0");
+ }
+ sameAuthParamsLookupAutoClusterFailover.recoverThreshold =
recoverThreshold;
+ return this;
+ }
+
+ public Builder checkHealthyIntervalMs(int checkHealthyIntervalMs) {
+ if (checkHealthyIntervalMs < 1) {
+ throw new IllegalArgumentException("checkHealthyIntervalMs
must be larger than 0");
+ }
+ sameAuthParamsLookupAutoClusterFailover.checkHealthyIntervalMs =
checkHealthyIntervalMs;
+ return this;
+ }
+
+ public Builder testTopic(String testTopic) {
+ if (StringUtils.isBlank(testTopic) && TopicName.get(testTopic) !=
null) {
+ throw new IllegalArgumentException("testTopic can not be
blank");
+ }
+ sameAuthParamsLookupAutoClusterFailover.testTopic = testTopic;
+ return this;
+ }
+
+ public Builder markTopicNotFoundAsAvailable(boolean
markTopicNotFoundAsAvailable) {
+
sameAuthParamsLookupAutoClusterFailover.markTopicNotFoundAsAvailable =
markTopicNotFoundAsAvailable;
+ return this;
+ }
+
+ public Builder pulsarServiceUrlArray(String[] pulsarServiceUrlArray) {
+ if (pulsarServiceUrlArray == null || pulsarServiceUrlArray.length
== 0) {
+ throw new IllegalArgumentException("pulsarServiceUrlArray can
not be empty");
+ }
+ sameAuthParamsLookupAutoClusterFailover.pulsarServiceUrlArray =
pulsarServiceUrlArray;
+ int pulsarServiceLen = pulsarServiceUrlArray.length;
+ HashSet<String> uniqueChecker = new HashSet<>();
+ for (int i = 0; i < pulsarServiceLen; i++) {
+ String pulsarService = pulsarServiceUrlArray[i];
+ if (StringUtils.isBlank(pulsarService)) {
+ throw new IllegalArgumentException("pulsarServiceUrlArray
contains a blank value at index " + i);
+ }
+ if (pulsarService.startsWith("http") ||
pulsarService.startsWith("HTTP")) {
+ throw new
IllegalArgumentException("SameAuthParamsLookupAutoClusterFailover does not
support HTTP"
+ + " protocol pulsar service url so far.");
+ }
+ if (!uniqueChecker.add(pulsarService)) {
+ throw new IllegalArgumentException("pulsarServiceUrlArray
contains duplicated value "
+ + pulsarServiceUrlArray[i]);
+ }
+ }
+ return this;
+ }
+
+ public SameAuthParamsLookupAutoClusterFailover build() {
+ String[] pulsarServiceUrlArray =
sameAuthParamsLookupAutoClusterFailover.pulsarServiceUrlArray;
+ if (pulsarServiceUrlArray == null) {
+ throw new IllegalArgumentException("pulsarServiceUrlArray can
not be empty");
+ }
+ int pulsarServiceLen = pulsarServiceUrlArray.length;
+ sameAuthParamsLookupAutoClusterFailover.pulsarServiceStateArray =
new PulsarServiceState[pulsarServiceLen];
+ sameAuthParamsLookupAutoClusterFailover.checkCounterArray = new
MutableInt[pulsarServiceLen];
+ for (int i = 0; i < pulsarServiceLen; i++) {
+
sameAuthParamsLookupAutoClusterFailover.pulsarServiceStateArray[i] =
PulsarServiceState.Healthy;
+ sameAuthParamsLookupAutoClusterFailover.checkCounterArray[i] =
new MutableInt(0);
+ }
+ return sameAuthParamsLookupAutoClusterFailover;
+ }
+ }
+}
+
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index 545cf7483e4..b275ffb6012 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -32,7 +31,6 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.awaitility.Awaitility;
@@ -43,7 +41,7 @@ import org.testng.annotations.Test;
@Slf4j
public class AutoClusterFailoverTest {
@Test
- public void testBuildAutoClusterFailoverInstance() throws
PulsarClientException {
+ public void testBuildAutoClusterFailoverInstance() throws Exception {
String primary = "pulsar://localhost:6650";
String secondary = "pulsar://localhost:6651";
long failoverDelay = 30;
@@ -106,7 +104,7 @@ public class AutoClusterFailoverTest {
}
@Test
- public void testInitialize() {
+ public void testInitialize() throws Exception {
String primary = "pulsar://localhost:6650";
String secondary = "pulsar://localhost:6651";
long failoverDelay = 10;
@@ -151,7 +149,7 @@ public class AutoClusterFailoverTest {
}
@Test
- public void testAutoClusterFailoverSwitchWithoutAuthentication() {
+ public void testAutoClusterFailoverSwitchWithoutAuthentication() throws
Exception {
String primary = "pulsar://localhost:6650";
String secondary = "pulsar://localhost:6651";
long failoverDelay = 1;
@@ -187,7 +185,7 @@ public class AutoClusterFailoverTest {
}
@Test
- public void testAutoClusterFailoverSwitchWithAuthentication() throws
IOException {
+ public void testAutoClusterFailoverSwitchWithAuthentication() throws
Exception {
String primary = "pulsar+ssl://localhost:6651";
String secondary = "pulsar+ssl://localhost:6661";
long failoverDelay = 1;
@@ -251,7 +249,7 @@ public class AutoClusterFailoverTest {
}
@Test
- public void testAutoClusterFailoverSwitchTlsTrustStore() throws
IOException {
+ public void testAutoClusterFailoverSwitchTlsTrustStore() throws Exception {
String primary = "pulsar+ssl://localhost:6651";
String secondary = "pulsar+ssl://localhost:6661";
long failoverDelay = 1;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index 36160d40d54..fa7145794e1 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -37,7 +36,7 @@ import static org.mockito.Mockito.when;
@Test(groups = "broker-impl")
public class ControlledClusterFailoverTest {
@Test
- public void testBuildControlledClusterFailoverInstance() throws
IOException {
+ public void testBuildControlledClusterFailoverInstance() throws Exception {
String defaultServiceUrl = "pulsar://localhost:6650";
String urlProvider = "http://localhost:8080/test";
String keyA = "key-a";
@@ -67,7 +66,7 @@ public class ControlledClusterFailoverTest {
}
@Test
- public void testControlledClusterFailoverSwitch() throws IOException {
+ public void testControlledClusterFailoverSwitch() throws Exception {
String defaultServiceUrl = "pulsar+ssl://localhost:6651";
String backupServiceUrl = "pulsar+ssl://localhost:6661";
String urlProvider = "http://localhost:8080";