codelipenghui commented on a change in pull request #13316:
URL: https://github.com/apache/pulsar/pull/13316#discussion_r792399794



##########
File path: 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
##########
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+
+public class AutoClusterFailoverTest {

Review comment:
       Need a test group name

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
##########
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.base.Strings;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AutoClusterFailoverBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.net.ServiceURI;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Slf4j
+@Data
+public class AutoClusterFailover implements ServiceUrlProvider {
+    private PulsarClient pulsarClient;
+    private volatile String currentPulsarServiceUrl;
+    private final String primary;
+    private final List<String> secondary;
+    private final AutoClusterFailoverBuilder.SecondaryChoosePolicy 
secondaryChoosePolicy;
+    private final Authentication primaryAuthentication;
+    private final Map<String, Authentication> secondaryAuthentications;
+    private final String primaryTlsTrustCertsFilePath;
+    private final Map<String, String> secondaryTlsTrustCertsFilePaths;
+    private String primaryTlsTrustStorePath;
+    private Map<String, String> secondaryTlsTrustStorePaths;
+    private String primaryTlsTrustStorePassword;
+    private Map<String, String> secondaryTlsTrustStorePasswords;
+    private final long failoverDelayNs;
+    private final long switchBackDelayNs;
+    private final ScheduledExecutorService executor;
+    private long recoverTimestamp;
+    private long failedTimestamp;
+    private final long intervalMs;
+    private static final int TIMEOUT = 30_000;
+
+    private AutoClusterFailover(AutoClusterFailoverBuilderImpl builder) {
+        this.primary = builder.primary;
+        this.secondary = builder.secondary;
+        this.secondaryChoosePolicy = builder.secondaryChoosePolicy;
+        this.primaryAuthentication = builder.primaryAuthentication;
+        this.secondaryAuthentications = builder.secondaryAuthentications;
+        this.primaryTlsTrustCertsFilePath = 
builder.primaryTlsTrustCertsFilePath;
+        this.secondaryTlsTrustCertsFilePaths = 
builder.secondaryTlsTrustCertsFilePaths;
+        this.primaryTlsTrustStorePath = builder.primaryTlsTrustStorePath;
+        this.secondaryTlsTrustStorePaths = builder.secondaryTlsTrustStorePaths;
+        this.primaryTlsTrustStorePassword = 
builder.primaryTlsTrustStorePassword;
+        this.secondaryTlsTrustStorePasswords = 
builder.secondaryTlsTrustStorePasswords;
+        this.failoverDelayNs = builder.failoverDelayNs;
+        this.switchBackDelayNs = builder.switchBackDelayNs;
+        this.currentPulsarServiceUrl = builder.primary;
+        this.recoverTimestamp = -1;
+        this.failedTimestamp = -1;
+        this.intervalMs = builder.checkIntervalMs;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-service-provider"));
+    }
+
+    @Override
+    public void initialize(PulsarClient client) {
+        this.pulsarClient = client;
+
+        // start to probe primary cluster active or not
+        this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
+            if (currentPulsarServiceUrl.equals(primary)) {
+                // current service url is primary, probe whether it is down
+                probeAndUpdateServiceUrl(secondary, secondaryAuthentications, 
secondaryTlsTrustCertsFilePaths,
+                        secondaryTlsTrustStorePaths, 
secondaryTlsTrustStorePasswords);
+            } else {
+                // current service url is secondary, probe whether it is down
+                probeAndUpdateServiceUrl(primary, primaryAuthentication, 
primaryTlsTrustCertsFilePath,
+                        primaryTlsTrustStorePath, 
primaryTlsTrustStorePassword);
+                // secondary cluster is up, check whether need to switch back 
to primary
+                probeAndCheckSwitchBack(primary, primaryAuthentication, 
primaryTlsTrustCertsFilePath,
+                        primaryTlsTrustStorePath, 
primaryTlsTrustStorePassword);
+            }
+        }), intervalMs, intervalMs, TimeUnit.MILLISECONDS);
+
+    }
+
+    @Override
+    public String getServiceUrl() {
+        return this.currentPulsarServiceUrl;
+    }
+
+    @Override
+    public void close() {
+        this.executor.shutdown();
+    }
+
+    boolean probeAvailable(String url) {
+        try {
+            URI uri = ServiceURI.create(url).getUri();
+            Socket socket = new Socket();
+            socket.connect(new InetSocketAddress(uri.getHost(), 
uri.getPort()), TIMEOUT);
+            socket.close();
+            return true;
+        } catch (Exception e) {
+            log.warn("Failed to probe available, url: {}", url, e);
+            return false;
+        }
+    }
+
+    private static long nanosToMillis(long nanos) {
+        return Math.max(0L, Math.round(nanos / 1_000_000.0d));
+    }
+
+    private void updateServiceUrl(String target,
+                                  Authentication authentication,
+                                  String tlsTrustCertsFilePath,
+                                  String tlsTrustStorePath,
+                                  String tlsTrustStorePassword) {
+        try {
+            if (!Strings.isNullOrEmpty(tlsTrustCertsFilePath)) {
+                
pulsarClient.updateTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+            }
+
+            if (authentication != null) {
+                pulsarClient.updateAuthentication(authentication);
+            }
+
+            if (!Strings.isNullOrEmpty(tlsTrustStorePath)) {
+                
pulsarClient.updateTlsTrustStorePathAndPassword(tlsTrustStorePath, 
tlsTrustStorePassword);
+            }
+
+            pulsarClient.updateServiceUrl(target);
+            currentPulsarServiceUrl = target;
+        } catch (IOException e) {
+            log.error("Current Pulsar service is {}, "
+                    + "failed to switch back to {} ", currentPulsarServiceUrl, 
target, e);
+        }
+    }
+
+    private void probeAndUpdateServiceUrl(List<String> targetServiceUrls,
+                                          Map<String, Authentication> 
authentications,
+                                          Map<String, String> 
tlsTrustCertsFilePaths,
+                                          Map<String, String> 
tlsTrustStorePaths,
+                                          Map<String, String> 
tlsTrustStorePasswords) {
+        if (probeAvailable(currentPulsarServiceUrl)) {
+            failedTimestamp = -1;
+            return;
+        }
+
+        long currentTimestamp = System.nanoTime();
+        if (failedTimestamp == -1) {
+            failedTimestamp = currentTimestamp;
+        } else if (currentTimestamp - failedTimestamp >= failoverDelayNs) {
+            for (String targetServiceUrl : targetServiceUrls) {
+                if (probeAvailable(targetServiceUrl)) {
+                    log.info("Current Pulsar service is {}, it has been down 
for {} ms, "
+                                    + "switch to the service {}. The current 
service down at {}",
+                            currentPulsarServiceUrl, 
nanosToMillis(currentTimestamp - failedTimestamp),
+                            targetServiceUrl, failedTimestamp);
+                    updateServiceUrl(targetServiceUrl,
+                            authentications != null ? 
authentications.get(targetServiceUrl) : null,
+                            tlsTrustCertsFilePaths != null ? 
tlsTrustCertsFilePaths.get(targetServiceUrl) : null,
+                            tlsTrustStorePaths != null ? 
tlsTrustStorePaths.get(targetServiceUrl) : null,
+                            tlsTrustStorePasswords != null ? 
tlsTrustStorePasswords.get(targetServiceUrl) : null);
+                    failedTimestamp = -1;
+                    break;
+                } else {
+                    log.warn("Current Pulsar service is {}, it has been down 
for {} ms. "
+                                    + "Failed to switch to service {}, "
+                                    + "because it is not available, continue 
to probe next pulsar service.",
+                            currentPulsarServiceUrl, 
nanosToMillis(currentTimestamp - failedTimestamp), targetServiceUrl);
+                }
+            }
+        }
+    }
+
+    private void probeAndUpdateServiceUrl(String targetServiceUrl,
+                                          Authentication authentication,
+                                          String tlsTrustCertsFilePath,
+                                          String tlsTrustStorePath,
+                                          String tlsTrustStorePassword) {
+        if (probeAvailable(currentPulsarServiceUrl)) {
+            failedTimestamp = -1;
+            return;
+        }
+
+        long currentTimestamp = System.nanoTime();
+        if (failedTimestamp == -1) {
+            failedTimestamp = currentTimestamp;
+        } else if (currentTimestamp - failedTimestamp >= failoverDelayNs) {
+            if (probeAvailable(targetServiceUrl)) {
+                log.info("Current Pulsar service is {}, it has been down for 
{} ms, "
+                                + "switch to the service {}. The current 
service down at {}",
+                        currentPulsarServiceUrl, 
nanosToMillis(currentTimestamp - failedTimestamp),
+                        targetServiceUrl, failedTimestamp);
+                updateServiceUrl(targetServiceUrl, authentication, 
tlsTrustCertsFilePath,
+                        tlsTrustStorePath, tlsTrustStorePassword);
+                failedTimestamp = -1;
+            } else {
+                log.error("Current Pulsar service is {}, it has been down for 
{} ms. "
+                                + "Failed to switch to service {}, "
+                                + "because it is not available",
+                        currentPulsarServiceUrl, 
nanosToMillis(currentTimestamp - failedTimestamp),
+                        targetServiceUrl);
+            }
+        }
+    }
+
+    private void probeAndCheckSwitchBack(String target,
+                                         Authentication authentication,
+                                         String tlsTrustCertsFilePath,
+                                         String tlsTrustStorePath,
+                                         String tlsTrustStorePassword) {
+        long currentTimestamp = System.nanoTime();
+        if (!probeAvailable(target)) {
+            recoverTimestamp = -1;
+            return;
+        }
+
+        if (recoverTimestamp == -1) {
+            recoverTimestamp = currentTimestamp;
+        } else if (currentTimestamp - recoverTimestamp >= switchBackDelayNs) {
+            log.info("Current Pulsar service is secondary: {}, "
+                            + "the primary service: {} has been recover for {} 
ms, "
+                            + "switch back to the primary service",
+                    currentPulsarServiceUrl, target, 
nanosToMillis(currentTimestamp - recoverTimestamp));
+            updateServiceUrl(target, authentication, tlsTrustCertsFilePath, 
tlsTrustStorePath, tlsTrustStorePassword);
+            recoverTimestamp = -1;
+        }
+    }
+
+    public static class AutoClusterFailoverBuilderImpl implements 
AutoClusterFailoverBuilder {
+        private String primary;
+        private List<String> secondary;
+        private Authentication primaryAuthentication = null;
+        private Map<String, Authentication> secondaryAuthentications = null;
+        private String primaryTlsTrustCertsFilePath = null;
+        private Map<String, String> secondaryTlsTrustCertsFilePaths = null;
+        private String primaryTlsTrustStorePath = null;
+        private Map<String, String> secondaryTlsTrustStorePaths = null;
+        private String primaryTlsTrustStorePassword = null;
+        private Map<String, String> secondaryTlsTrustStorePasswords = null;
+        private SecondaryChoosePolicy secondaryChoosePolicy = 
SecondaryChoosePolicy.ORDER;
+        private long failoverDelayNs;
+        private long switchBackDelayNs;
+        private long checkIntervalMs = 30_000;
+
+        @Override
+        public AutoClusterFailoverBuilder primary(@NonNull String primary) {
+            this.primary = primary;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder secondary(@NonNull List<String> 
secondary) {
+            this.secondary = secondary;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder secondaryChoosePolicy(@NonNull 
SecondaryChoosePolicy policy) {
+            this.secondaryChoosePolicy = policy;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder primaryAuthentication(Authentication 
authentication) {
+            this.primaryAuthentication = authentication;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder secondaryAuthentication(Map<String, 
Authentication> authentication) {
+            this.secondaryAuthentications = authentication;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder primaryTlsTrustCertsFilePath(String 
tlsTrustCertsFilePath) {
+            this.primaryTlsTrustCertsFilePath = tlsTrustCertsFilePath;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder 
secondaryTlsTrustCertsFilePath(Map<String, String> tlsTrustCertsFilePath) {
+            this.secondaryTlsTrustCertsFilePaths = tlsTrustCertsFilePath;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder primaryTlsTrustStorePath(String 
tlsTrustStorePath) {
+            this.primaryTlsTrustStorePath = tlsTrustStorePath;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder 
secondaryTlsTrustStorePath(Map<String, String> tlsTrustStorePath) {
+            this.secondaryTlsTrustStorePaths = tlsTrustStorePath;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder primaryTlsTrustStorePassword(String 
tlsTrustStorePassword) {
+            this.primaryTlsTrustStorePassword = tlsTrustStorePassword;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder 
secondaryTlsTrustStorePassword(Map<String, String> tlsTrustStorePassword) {
+            this.secondaryTlsTrustStorePasswords = tlsTrustStorePassword;
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder failoverDelay(long failoverDelay, 
TimeUnit timeUnit) {
+            this.failoverDelayNs = timeUnit.toNanos(failoverDelay);
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder switchBackDelay(long 
switchBackDelay, TimeUnit timeUnit) {
+            this.switchBackDelayNs = timeUnit.toNanos(switchBackDelay);
+            return this;
+        }
+
+        @Override
+        public AutoClusterFailoverBuilder checkInterval(long interval, 
TimeUnit timeUnit) {
+            this.checkIntervalMs = timeUnit.toMillis(interval);
+            return this;
+        }
+
+        @Override
+        public ServiceUrlProvider build() {
+            Objects.requireNonNull(primary, "primary service url shouldn't be 
null");
+            checkArgument(secondary != null && secondary.size() > 0,
+                    "secondary cluster service url shouldn't be null and 
should set at least one");
+            checkArgument(failoverDelayNs >= 0, "failoverDelay should >= 0");
+            checkArgument(switchBackDelayNs >= 0, "switchBackDelay should >= 
0");
+            checkArgument(checkIntervalMs >= 0, "checkInterval should >= 0");

Review comment:
       The interval should not be 0?

##########
File path: 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+
+public class ControlledClusterFailoverTest {

Review comment:
       Need a test group name

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Slf4j
+public class ControlledClusterFailover implements ServiceUrlProvider {
+    private PulsarClient pulsarClient;
+    private volatile String currentPulsarServiceUrl;
+    private volatile ControlledConfiguration currentControlledConfiguration;
+    private final URL pulsarUrlProvider;
+    private final ScheduledExecutorService executor;
+    private long interval;
+    private ObjectMapper objectMapper = null;
+
+    private ControlledClusterFailover(String defaultServiceUrl, String 
urlProvider, long interval) throws IOException {
+        this.currentPulsarServiceUrl = defaultServiceUrl;
+        this.pulsarUrlProvider = new URL(urlProvider);
+        this.interval = interval;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-service-provider"));
+    }
+
+    @Override
+    public void initialize(PulsarClient client) {
+        this.pulsarClient = client;
+
+        // start to check service url every 30 seconds
+        this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
+            ControlledConfiguration controlledConfiguration = null;
+            try {
+                controlledConfiguration = fetchControlledConfiguration();
+                if (controlledConfiguration != null
+                        && 
!Strings.isNullOrEmpty(controlledConfiguration.getServiceUrl())
+                        && 
!controlledConfiguration.equals(currentControlledConfiguration)) {
+                    log.info("Switch Pulsar service url from {} to {}",
+                            currentControlledConfiguration, 
controlledConfiguration.toString());
+
+                    Authentication authentication = null;
+                    if 
(!Strings.isNullOrEmpty(controlledConfiguration.authPluginClassName)
+                            && 
!Strings.isNullOrEmpty(controlledConfiguration.getAuthParamsString())) {
+                        authentication = 
AuthenticationFactory.create(controlledConfiguration.getAuthPluginClassName(),
+                                controlledConfiguration.getAuthParamsString());
+                    }
+
+                    String tlsTrustCertsFilePath = 
controlledConfiguration.getTlsTrustCertsFilePath();
+                    String serviceUrl = 
controlledConfiguration.getServiceUrl();
+
+                    if (authentication != null) {
+                        pulsarClient.updateAuthentication(authentication);
+                    }
+
+                    if (!Strings.isNullOrEmpty(tlsTrustCertsFilePath)) {
+                        
pulsarClient.updateTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+                    }
+
+                    pulsarClient.updateServiceUrl(serviceUrl);
+                    currentPulsarServiceUrl = serviceUrl;
+                    currentControlledConfiguration = controlledConfiguration;
+                }
+            } catch (IOException e) {
+                log.error("Failed to switch new Pulsar url, current: {}, new: 
{}",
+                        currentControlledConfiguration, 
controlledConfiguration, e);
+            }
+        }), interval, interval, TimeUnit.MILLISECONDS);
+    }
+
+    public String getCurrentPulsarServiceUrl() {
+        return this.currentPulsarServiceUrl;
+    }
+
+    public URL getPulsarUrlProvider() {
+        return this.pulsarUrlProvider;
+    }
+
+    protected ControlledConfiguration fetchControlledConfiguration() throws 
IOException {
+        // call the service to get service URL
+        InputStream inputStream = null;
+        try {
+            URLConnection conn = pulsarUrlProvider.openConnection();
+            inputStream = conn.getInputStream();
+            String jsonStr = new String(IOUtils.toByteArray(inputStream), 
StandardCharsets.UTF_8);
+            ObjectMapper objectMapper = getObjectMapper();
+            return objectMapper.readValue(jsonStr, 
ControlledConfiguration.class);

Review comment:
       It's better to use an HTTP client library? Users might also need authn 
for the provided HTTP endpoint.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Slf4j
+public class ControlledClusterFailover implements ServiceUrlProvider {
+    private PulsarClient pulsarClient;
+    private volatile String currentPulsarServiceUrl;
+    private volatile ControlledConfiguration currentControlledConfiguration;
+    private final URL pulsarUrlProvider;
+    private final ScheduledExecutorService executor;
+    private long interval;
+    private ObjectMapper objectMapper = null;
+
+    private ControlledClusterFailover(String defaultServiceUrl, String 
urlProvider, long interval) throws IOException {
+        this.currentPulsarServiceUrl = defaultServiceUrl;
+        this.pulsarUrlProvider = new URL(urlProvider);
+        this.interval = interval;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-service-provider"));
+    }
+
+    @Override
+    public void initialize(PulsarClient client) {
+        this.pulsarClient = client;
+
+        // start to check service url every 30 seconds
+        this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
+            ControlledConfiguration controlledConfiguration = null;
+            try {
+                controlledConfiguration = fetchControlledConfiguration();
+                if (controlledConfiguration != null
+                        && 
!Strings.isNullOrEmpty(controlledConfiguration.getServiceUrl())
+                        && 
!controlledConfiguration.equals(currentControlledConfiguration)) {
+                    log.info("Switch Pulsar service url from {} to {}",
+                            currentControlledConfiguration, 
controlledConfiguration.toString());
+
+                    Authentication authentication = null;
+                    if 
(!Strings.isNullOrEmpty(controlledConfiguration.authPluginClassName)
+                            && 
!Strings.isNullOrEmpty(controlledConfiguration.getAuthParamsString())) {
+                        authentication = 
AuthenticationFactory.create(controlledConfiguration.getAuthPluginClassName(),
+                                controlledConfiguration.getAuthParamsString());
+                    }
+
+                    String tlsTrustCertsFilePath = 
controlledConfiguration.getTlsTrustCertsFilePath();
+                    String serviceUrl = 
controlledConfiguration.getServiceUrl();
+
+                    if (authentication != null) {
+                        pulsarClient.updateAuthentication(authentication);
+                    }
+
+                    if (!Strings.isNullOrEmpty(tlsTrustCertsFilePath)) {
+                        
pulsarClient.updateTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+                    }
+
+                    pulsarClient.updateServiceUrl(serviceUrl);
+                    currentPulsarServiceUrl = serviceUrl;
+                    currentControlledConfiguration = controlledConfiguration;
+                }
+            } catch (IOException e) {
+                log.error("Failed to switch new Pulsar url, current: {}, new: 
{}",
+                        currentControlledConfiguration, 
controlledConfiguration, e);
+            }
+        }), interval, interval, TimeUnit.MILLISECONDS);
+    }
+
+    public String getCurrentPulsarServiceUrl() {
+        return this.currentPulsarServiceUrl;
+    }
+
+    public URL getPulsarUrlProvider() {
+        return this.pulsarUrlProvider;
+    }
+
+    protected ControlledConfiguration fetchControlledConfiguration() throws 
IOException {
+        // call the service to get service URL
+        InputStream inputStream = null;
+        try {
+            URLConnection conn = pulsarUrlProvider.openConnection();
+            inputStream = conn.getInputStream();
+            String jsonStr = new String(IOUtils.toByteArray(inputStream), 
StandardCharsets.UTF_8);
+            ObjectMapper objectMapper = getObjectMapper();
+            return objectMapper.readValue(jsonStr, 
ControlledConfiguration.class);
+        } catch (IOException e) {
+            log.warn("Failed to fetch controlled configuration. ", e);
+            return null;
+        } finally {
+            if (inputStream != null) {
+                inputStream.close();
+            }
+        }
+    }
+
+    private ObjectMapper getObjectMapper() {
+        if (objectMapper == null) {
+            objectMapper = new ObjectMapper();
+        }
+        return objectMapper;
+    }
+
+    @Data
+    protected static class ControlledConfiguration {
+        private String serviceUrl;
+        private String tlsTrustCertsFilePath;
+
+        private String authPluginClassName;
+        private String authParamsString;
+
+        public String toJson() {
+            ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
+            try {
+                return objectMapper.writeValueAsString(this);
+            } catch (JsonProcessingException e) {
+                log.warn("Failed to write as json. ", e);
+                return null;
+            }
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof ControlledConfiguration) {
+                ControlledConfiguration other = (ControlledConfiguration) obj;
+                return Objects.equals(serviceUrl, other.serviceUrl)
+                        && Objects.equals(tlsTrustCertsFilePath, 
other.tlsTrustCertsFilePath)
+                        && Objects.equals(authPluginClassName, 
other.authPluginClassName)
+                        && Objects.equals(authParamsString, 
other.authParamsString);
+            }
+
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(serviceUrl,
+                    tlsTrustCertsFilePath,
+                    authPluginClassName,
+                    authParamsString);
+        }

Review comment:
       @Data already has equals and hashcode

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Slf4j
+public class ControlledClusterFailover implements ServiceUrlProvider {
+    private PulsarClient pulsarClient;
+    private volatile String currentPulsarServiceUrl;
+    private volatile ControlledConfiguration currentControlledConfiguration;
+    private final URL pulsarUrlProvider;
+    private final ScheduledExecutorService executor;
+    private long interval;
+    private ObjectMapper objectMapper = null;
+
+    private ControlledClusterFailover(String defaultServiceUrl, String 
urlProvider, long interval) throws IOException {
+        this.currentPulsarServiceUrl = defaultServiceUrl;
+        this.pulsarUrlProvider = new URL(urlProvider);
+        this.interval = interval;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-service-provider"));
+    }
+
+    @Override
+    public void initialize(PulsarClient client) {
+        this.pulsarClient = client;
+
+        // start to check service url every 30 seconds

Review comment:
       It depends on the interval.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Slf4j
+public class ControlledClusterFailover implements ServiceUrlProvider {
+    private PulsarClient pulsarClient;
+    private volatile String currentPulsarServiceUrl;
+    private volatile ControlledConfiguration currentControlledConfiguration;
+    private final URL pulsarUrlProvider;
+    private final ScheduledExecutorService executor;
+    private long interval;
+    private ObjectMapper objectMapper = null;
+
+    private ControlledClusterFailover(String defaultServiceUrl, String 
urlProvider, long interval) throws IOException {
+        this.currentPulsarServiceUrl = defaultServiceUrl;
+        this.pulsarUrlProvider = new URL(urlProvider);
+        this.interval = interval;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-service-provider"));
+    }
+
+    @Override
+    public void initialize(PulsarClient client) {
+        this.pulsarClient = client;
+
+        // start to check service url every 30 seconds
+        this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
+            ControlledConfiguration controlledConfiguration = null;
+            try {
+                controlledConfiguration = fetchControlledConfiguration();
+                if (controlledConfiguration != null
+                        && 
!Strings.isNullOrEmpty(controlledConfiguration.getServiceUrl())
+                        && 
!controlledConfiguration.equals(currentControlledConfiguration)) {
+                    log.info("Switch Pulsar service url from {} to {}",
+                            currentControlledConfiguration, 
controlledConfiguration.toString());
+
+                    Authentication authentication = null;
+                    if 
(!Strings.isNullOrEmpty(controlledConfiguration.authPluginClassName)
+                            && 
!Strings.isNullOrEmpty(controlledConfiguration.getAuthParamsString())) {
+                        authentication = 
AuthenticationFactory.create(controlledConfiguration.getAuthPluginClassName(),
+                                controlledConfiguration.getAuthParamsString());
+                    }
+
+                    String tlsTrustCertsFilePath = 
controlledConfiguration.getTlsTrustCertsFilePath();
+                    String serviceUrl = 
controlledConfiguration.getServiceUrl();
+
+                    if (authentication != null) {
+                        pulsarClient.updateAuthentication(authentication);
+                    }
+
+                    if (!Strings.isNullOrEmpty(tlsTrustCertsFilePath)) {
+                        
pulsarClient.updateTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+                    }
+
+                    pulsarClient.updateServiceUrl(serviceUrl);
+                    currentPulsarServiceUrl = serviceUrl;
+                    currentControlledConfiguration = controlledConfiguration;
+                }
+            } catch (IOException e) {
+                log.error("Failed to switch new Pulsar url, current: {}, new: 
{}",
+                        currentControlledConfiguration, 
controlledConfiguration, e);
+            }
+        }), interval, interval, TimeUnit.MILLISECONDS);
+    }
+
+    public String getCurrentPulsarServiceUrl() {
+        return this.currentPulsarServiceUrl;
+    }
+
+    public URL getPulsarUrlProvider() {
+        return this.pulsarUrlProvider;
+    }
+
+    protected ControlledConfiguration fetchControlledConfiguration() throws 
IOException {
+        // call the service to get service URL
+        InputStream inputStream = null;
+        try {
+            URLConnection conn = pulsarUrlProvider.openConnection();
+            inputStream = conn.getInputStream();
+            String jsonStr = new String(IOUtils.toByteArray(inputStream), 
StandardCharsets.UTF_8);
+            ObjectMapper objectMapper = getObjectMapper();
+            return objectMapper.readValue(jsonStr, 
ControlledConfiguration.class);
+        } catch (IOException e) {
+            log.warn("Failed to fetch controlled configuration. ", e);
+            return null;
+        } finally {
+            if (inputStream != null) {
+                inputStream.close();
+            }
+        }
+    }
+
+    private ObjectMapper getObjectMapper() {
+        if (objectMapper == null) {
+            objectMapper = new ObjectMapper();
+        }
+        return objectMapper;
+    }
+
+    @Data
+    protected static class ControlledConfiguration {
+        private String serviceUrl;
+        private String tlsTrustCertsFilePath;
+
+        private String authPluginClassName;
+        private String authParamsString;

Review comment:
       So, we need different URLs for multiple clients using the 
ControlledClusterFailover. 😭

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Slf4j
+public class ControlledClusterFailover implements ServiceUrlProvider {
+    private PulsarClient pulsarClient;
+    private volatile String currentPulsarServiceUrl;
+    private volatile ControlledConfiguration currentControlledConfiguration;
+    private final URL pulsarUrlProvider;
+    private final ScheduledExecutorService executor;
+    private long interval;
+    private ObjectMapper objectMapper = null;
+
+    private ControlledClusterFailover(String defaultServiceUrl, String 
urlProvider, long interval) throws IOException {
+        this.currentPulsarServiceUrl = defaultServiceUrl;
+        this.pulsarUrlProvider = new URL(urlProvider);
+        this.interval = interval;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-service-provider"));
+    }
+
+    @Override
+    public void initialize(PulsarClient client) {
+        this.pulsarClient = client;
+
+        // start to check service url every 30 seconds
+        this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
+            ControlledConfiguration controlledConfiguration = null;
+            try {
+                controlledConfiguration = fetchControlledConfiguration();
+                if (controlledConfiguration != null
+                        && 
!Strings.isNullOrEmpty(controlledConfiguration.getServiceUrl())
+                        && 
!controlledConfiguration.equals(currentControlledConfiguration)) {
+                    log.info("Switch Pulsar service url from {} to {}",
+                            currentControlledConfiguration, 
controlledConfiguration.toString());
+
+                    Authentication authentication = null;
+                    if 
(!Strings.isNullOrEmpty(controlledConfiguration.authPluginClassName)
+                            && 
!Strings.isNullOrEmpty(controlledConfiguration.getAuthParamsString())) {
+                        authentication = 
AuthenticationFactory.create(controlledConfiguration.getAuthPluginClassName(),
+                                controlledConfiguration.getAuthParamsString());
+                    }
+
+                    String tlsTrustCertsFilePath = 
controlledConfiguration.getTlsTrustCertsFilePath();
+                    String serviceUrl = 
controlledConfiguration.getServiceUrl();
+
+                    if (authentication != null) {
+                        pulsarClient.updateAuthentication(authentication);
+                    }
+
+                    if (!Strings.isNullOrEmpty(tlsTrustCertsFilePath)) {
+                        
pulsarClient.updateTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+                    }
+
+                    pulsarClient.updateServiceUrl(serviceUrl);
+                    currentPulsarServiceUrl = serviceUrl;
+                    currentControlledConfiguration = controlledConfiguration;
+                }
+            } catch (IOException e) {
+                log.error("Failed to switch new Pulsar url, current: {}, new: 
{}",
+                        currentControlledConfiguration, 
controlledConfiguration, e);
+            }
+        }), interval, interval, TimeUnit.MILLISECONDS);
+    }
+
+    public String getCurrentPulsarServiceUrl() {
+        return this.currentPulsarServiceUrl;
+    }
+
+    public URL getPulsarUrlProvider() {
+        return this.pulsarUrlProvider;
+    }
+
+    protected ControlledConfiguration fetchControlledConfiguration() throws 
IOException {
+        // call the service to get service URL
+        InputStream inputStream = null;
+        try {
+            URLConnection conn = pulsarUrlProvider.openConnection();
+            inputStream = conn.getInputStream();
+            String jsonStr = new String(IOUtils.toByteArray(inputStream), 
StandardCharsets.UTF_8);
+            ObjectMapper objectMapper = getObjectMapper();
+            return objectMapper.readValue(jsonStr, 
ControlledConfiguration.class);

Review comment:
       And we need to clarify the request method, content-type?

##########
File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ServiceUrlProvider.java
##########
@@ -51,4 +51,11 @@
      */
     String getServiceUrl();
 
+    /**
+     * Close the resource that the provider allocated.
+     *
+     */
+    default void close() {

Review comment:
       So, we can remove the default empty implementation here?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
##########
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ControlledClusterFailoverBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Slf4j
+public class ControlledClusterFailover implements ServiceUrlProvider {
+    private PulsarClient pulsarClient;
+    private volatile String currentPulsarServiceUrl;
+    private volatile ControlledConfiguration currentControlledConfiguration;
+    private final URL pulsarUrlProvider;
+    private final ScheduledExecutorService executor;
+    private long interval;
+    private ObjectMapper objectMapper = null;
+
+    private ControlledClusterFailover(String defaultServiceUrl, String 
urlProvider, long interval) throws IOException {
+        this.currentPulsarServiceUrl = defaultServiceUrl;
+        this.pulsarUrlProvider = new URL(urlProvider);
+        this.interval = interval;
+        this.executor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory("pulsar-service-provider"));
+    }
+
+    @Override
+    public void initialize(PulsarClient client) {
+        this.pulsarClient = client;
+
+        // start to check service url every 30 seconds
+        this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
+            ControlledConfiguration controlledConfiguration = null;
+            try {
+                controlledConfiguration = fetchControlledConfiguration();
+                if (controlledConfiguration != null
+                        && 
!Strings.isNullOrEmpty(controlledConfiguration.getServiceUrl())
+                        && 
!controlledConfiguration.equals(currentControlledConfiguration)) {
+                    log.info("Switch Pulsar service url from {} to {}",
+                            currentControlledConfiguration, 
controlledConfiguration.toString());
+
+                    Authentication authentication = null;
+                    if 
(!Strings.isNullOrEmpty(controlledConfiguration.authPluginClassName)
+                            && 
!Strings.isNullOrEmpty(controlledConfiguration.getAuthParamsString())) {
+                        authentication = 
AuthenticationFactory.create(controlledConfiguration.getAuthPluginClassName(),
+                                controlledConfiguration.getAuthParamsString());
+                    }
+
+                    String tlsTrustCertsFilePath = 
controlledConfiguration.getTlsTrustCertsFilePath();
+                    String serviceUrl = 
controlledConfiguration.getServiceUrl();
+
+                    if (authentication != null) {
+                        pulsarClient.updateAuthentication(authentication);
+                    }
+
+                    if (!Strings.isNullOrEmpty(tlsTrustCertsFilePath)) {
+                        
pulsarClient.updateTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+                    }
+
+                    pulsarClient.updateServiceUrl(serviceUrl);
+                    currentPulsarServiceUrl = serviceUrl;
+                    currentControlledConfiguration = controlledConfiguration;
+                }
+            } catch (IOException e) {
+                log.error("Failed to switch new Pulsar url, current: {}, new: 
{}",
+                        currentControlledConfiguration, 
controlledConfiguration, e);
+            }
+        }), interval, interval, TimeUnit.MILLISECONDS);
+    }
+
+    public String getCurrentPulsarServiceUrl() {
+        return this.currentPulsarServiceUrl;
+    }
+
+    public URL getPulsarUrlProvider() {
+        return this.pulsarUrlProvider;
+    }
+
+    protected ControlledConfiguration fetchControlledConfiguration() throws 
IOException {
+        // call the service to get service URL
+        InputStream inputStream = null;
+        try {
+            URLConnection conn = pulsarUrlProvider.openConnection();
+            inputStream = conn.getInputStream();
+            String jsonStr = new String(IOUtils.toByteArray(inputStream), 
StandardCharsets.UTF_8);
+            ObjectMapper objectMapper = getObjectMapper();
+            return objectMapper.readValue(jsonStr, 
ControlledConfiguration.class);
+        } catch (IOException e) {
+            log.warn("Failed to fetch controlled configuration. ", e);
+            return null;
+        } finally {
+            if (inputStream != null) {
+                inputStream.close();
+            }
+        }
+    }
+
+    private ObjectMapper getObjectMapper() {
+        if (objectMapper == null) {
+            objectMapper = new ObjectMapper();
+        }
+        return objectMapper;
+    }
+
+    @Data
+    protected static class ControlledConfiguration {
+        private String serviceUrl;
+        private String tlsTrustCertsFilePath;
+
+        private String authPluginClassName;
+        private String authParamsString;
+
+        public String toJson() {
+            ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();
+            try {
+                return objectMapper.writeValueAsString(this);
+            } catch (JsonProcessingException e) {
+                log.warn("Failed to write as json. ", e);
+                return null;
+            }
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof ControlledConfiguration) {
+                ControlledConfiguration other = (ControlledConfiguration) obj;
+                return Objects.equals(serviceUrl, other.serviceUrl)
+                        && Objects.equals(tlsTrustCertsFilePath, 
other.tlsTrustCertsFilePath)
+                        && Objects.equals(authPluginClassName, 
other.authPluginClassName)
+                        && Objects.equals(authParamsString, 
other.authParamsString);
+            }
+
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(serviceUrl,
+                    tlsTrustCertsFilePath,
+                    authPluginClassName,
+                    authParamsString);
+        }
+    }
+
+    @Override
+    public String getServiceUrl() {
+        return this.currentPulsarServiceUrl;
+    }
+
+    @Override
+    public void close() {
+        this.executor.shutdown();
+    }
+
+    public static class ControlledClusterFailoverBuilderImpl implements 
ControlledClusterFailoverBuilder {
+        private String defaultServiceUrl;
+        private String urlProvider;
+        private long interval = 30_000;
+
+        @Override
+        public ControlledClusterFailoverBuilder defaultServiceUrl(@NonNull 
String serviceUrl) {
+            this.defaultServiceUrl = serviceUrl;
+            return this;
+        }
+
+        @Override
+        public ControlledClusterFailoverBuilder urlProvider(@NonNull String 
urlProvider) {
+            this.urlProvider = urlProvider;
+            return this;
+        }
+
+        @Override
+        public ControlledClusterFailoverBuilder checkInterval(long interval, 
@NonNull TimeUnit timeUnit) {
+            this.interval = timeUnit.toMillis(interval);
+            return this;
+        }
+
+        @Override
+        public ServiceUrlProvider build() throws IOException {
+            Objects.requireNonNull(defaultServiceUrl, "default service url 
shouldn't be null");
+            Objects.requireNonNull(urlProvider, "urlProvider shouldn't be 
null");
+            checkArgument(interval >= 0, "checkInterval should >= 0");

Review comment:
       Should > 0




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to