This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d4fa00b005 [improve][cleanup] Keep usage of deprecated configs inside 
ServiceConfiguration (#16492)
3d4fa00b005 is described below

commit 3d4fa00b0059744c3d945c77672b9769c335014c
Author: tison <[email protected]>
AuthorDate: Tue Jul 19 14:36:36 2022 +0800

    [improve][cleanup] Keep usage of deprecated configs inside 
ServiceConfiguration (#16492)
    
    ### Motivation
    Cleanup code when reading it.
    
    ### Modifications
    Encapsulate usage of deprecated configs:
    - subscriptionKeySharedEnable
    - zookeeperServers
---
 .../apache/pulsar/broker/ServiceConfiguration.java | 36 +++++++++++---------
 .../configuration/PulsarConfigurationLoader.java   |  2 +-
 .../java/org/apache/pulsar/PulsarStandalone.java   |  3 +-
 .../apache/pulsar/broker/SLAMonitoringTest.java    |  2 +-
 .../AntiAffinityNamespaceGroupTest.java            |  4 +--
 .../loadbalance/LeaderElectionServiceTest.java     |  2 +-
 .../broker/loadbalance/LoadBalancerTest.java       |  2 +-
 .../loadbalance/ModularLoadManagerImplTest.java    |  6 ++--
 .../broker/loadbalance/SimpleBrokerStartTest.java  |  4 +--
 .../loadbalance/SimpleLoadManagerImplTest.java     |  4 +--
 .../loadbalance/impl/BundleSplitterTaskTest.java   |  2 +-
 .../broker/service/AdvertisedAddressTest.java      |  2 +-
 .../broker/service/BacklogQuotaManagerTest.java    |  2 +-
 .../pulsar/broker/service/BkEnsemblesTestBase.java |  2 +-
 .../broker/service/BrokerBookieIsolationTest.java  | 10 +++---
 .../pulsar/broker/service/TopicOwnerTest.java      |  2 +-
 .../coordinator/TransactionMetaStoreTestBase.java  |  2 +-
 .../client/api/ClientDeduplicationFailureTest.java |  2 +-
 .../client/api/KeySharedSubscriptionTest.java      | 38 +++++++---------------
 .../common/naming/ServiceConfigurationTest.java    | 13 ++++++--
 .../worker/PulsarFunctionE2ESecurityTest.java      |  2 +-
 .../worker/PulsarFunctionLocalRunTest.java         |  2 +-
 .../worker/PulsarFunctionPublishTest.java          |  2 +-
 .../functions/worker/PulsarFunctionTlsTest.java    |  2 +-
 .../worker/PulsarWorkerAssignmentTest.java         |  2 +-
 .../apache/pulsar/io/AbstractPulsarE2ETest.java    |  2 +-
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |  2 +-
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |  2 +-
 28 files changed, 78 insertions(+), 78 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c90c04e8eda..0e7eca59828 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -506,7 +506,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @Deprecated
     @FieldContext(
         category = CATEGORY_POLICIES,
-        doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead.\""
+        doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead."
     )
     private double backlogQuotaDefaultLimitGB = -1;
 
@@ -646,7 +646,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(
         category = CATEGORY_POLICIES,
         dynamic = true,
-        doc = "Enable Key_Shared subscription (default is enabled)"
+        doc = "Enable Key_Shared subscription (default is enabled).\n"
+            + "@deprecated - use subscriptionTypesEnabled instead."
     )
     private boolean subscriptionKeySharedEnable = true;
 
@@ -1903,7 +1904,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         deprecated = true,
         doc = "Max number of `acknowledgment holes` that can be stored in 
Zookeeper.\n\n"
             + "If number of unack message range is higher than this limit then 
broker will persist"
-            + " unacked ranges into bookkeeper to avoid additional data 
overhead into zookeeper.")
+            + " unacked ranges into bookkeeper to avoid additional data 
overhead into zookeeper.\n"
+            + "@deprecated - use 
managedLedgerMaxUnackedRangesToPersistInMetadataStore.")
     private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = -1;
     @FieldContext(
             category = CATEGORY_STORAGE_ML,
@@ -2312,7 +2314,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         category = CATEGORY_POLICIES,
         deprecated = true,
         doc = "How often broker checks for inactive topics to be deleted 
(topics with no subscriptions and no one "
-                + "connected) Deprecated in favor of using 
`brokerDeleteInactiveTopicsFrequencySeconds`"
+                + "connected) Deprecated in favor of using 
`brokerDeleteInactiveTopicsFrequencySeconds`\n"
+                + "@deprecated - unused."
     )
     private int brokerServicePurgeInactiveFrequencyInSeconds = 60;
     @FieldContext(
@@ -2864,12 +2867,22 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private Set<String> additionalServlets = new TreeSet<>();
 
+    /**
+     * @deprecated Use {@link #getSubscriptionTypesEnabled()} instead
+     */
+    @Deprecated
+    public boolean isSubscriptionKeySharedEnable() {
+        return subscriptionKeySharedEnable && 
subscriptionTypesEnabled.contains("Key_Shared");
+    }
+
     public String getMetadataStoreUrl() {
         if (StringUtils.isNotBlank(metadataStoreUrl)) {
             return metadataStoreUrl;
-        } else {
+        } else if (StringUtils.isNotBlank(zookeeperServers)) {
             // Fallback to old setting
-            return zookeeperServers;
+            return ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zookeeperServers;
+        } else {
+            return "";
         }
     }
 
@@ -2907,15 +2920,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         } else {
             // Fallback to same metadata service used by broker, adding the 
"metadata-store" to specify the BK
             // metadata adapter
-            String suffix;
-            if (StringUtils.isNotBlank(metadataStoreUrl)) {
-                suffix = metadataStoreUrl;
-            } else {
-                // Fallback to old setting
-                // Note: chroot is not settable by using 'zookeeperServers' 
config.
-                suffix = ZKMetadataStore.ZK_SCHEME_IDENTIFIER + 
zookeeperServers;
-            }
-            return "metadata-store:" + suffix + 
BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
+            // Note: chroot is not settable by using 'zookeeperServers' config.
+            return "metadata-store:" + getMetadataStoreUrl() + 
BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
         }
     }
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index 117681c87fe..12bba1a7fa2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -91,7 +91,7 @@ public class PulsarConfigurationLoader {
     public static <T extends PulsarConfiguration> T create(Properties 
properties,
             Class<? extends PulsarConfiguration> clazz) throws IOException, 
IllegalArgumentException {
         requireNonNull(properties);
-        T configuration = null;
+        T configuration;
         try {
             configuration = (T) clazz.getDeclaredConstructor().newInstance();
             configuration.setProperties(properties);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 09acd1edc0e..b8c175c0a4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -458,8 +458,7 @@ public class PulsarStandalone implements AutoCloseable {
                 this.getNumOfBk(), this.getZkPort(), this.getBkPort(), 
this.getStreamStoragePort(), this.getZkDir(),
                 this.getBkDir(), this.isWipeData(), "127.0.0.1");
         bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
-
-        config.setZookeeperServers("127.0.0.1:" + zkPort);
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
     }
 
     private static void processTerminator(int exitCode) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index af9aeb7eabf..749eeb7da3c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -84,7 +84,7 @@ public class SLAMonitoringTest {
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             configurations[i] = config;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 7595499e473..bafb523fb8f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -109,7 +109,7 @@ public class AntiAffinityNamespaceGroupTest {
         
config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
-        config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config1.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config1.setBrokerShutdownTimeoutMs(0L);
         config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config1.setBrokerServicePort(Optional.of(0));
@@ -131,7 +131,7 @@ public class AntiAffinityNamespaceGroupTest {
         
config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
-        config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config2.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config2.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 925879d13b7..502cc56818a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -74,7 +74,7 @@ public class LeaderElectionServiceTest {
         config.setWebServicePort(Optional.of(0));
         config.setClusterName(clusterName);
         config.setAdvertisedAddress("localhost");
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         @Cleanup
         PulsarService pulsar = 
spyWithClassAndConstructorArgs(MockPulsarService.class, config);
         pulsar.start();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 2e97cc43f43..a241f754d74 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -123,7 +123,7 @@ public class LoadBalancerTest {
             config.setWebServicePort(Optional.of(0));
             config.setBrokerServicePortTls(Optional.of(0));
             config.setWebServicePortTls(Optional.of(0));
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
             config.setBrokerShutdownTimeoutMs(0L);
             
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
             config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 3cf7db8e674..12f95a2b796 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -147,7 +147,7 @@ public class ModularLoadManagerImplTest {
         
config1.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
-        config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config1.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
 
         config1.setAdvertisedAddress("localhost");
         config1.setBrokerShutdownTimeoutMs(0L);
@@ -168,7 +168,7 @@ public class ModularLoadManagerImplTest {
         
config2.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
-        config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config2.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config2.setAdvertisedAddress("localhost");
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -591,7 +591,7 @@ public class ModularLoadManagerImplTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index a4e742b2bb6..c01aaec666b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -47,7 +47,7 @@ public class SimpleBrokerStartTest {
         ServiceConfiguration config = spy(ServiceConfiguration.class);
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -75,7 +75,7 @@ public class SimpleBrokerStartTest {
         ServiceConfiguration config = spy(ServiceConfiguration.class);
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 6b8711addd4..ba8035d9307 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -109,7 +109,7 @@ public class SimpleLoadManagerImplTest {
         ServiceConfiguration config1 = spy(ServiceConfiguration.class);
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
-        config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config1.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config1.setBrokerShutdownTimeoutMs(0L);
         config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config1.setBrokerServicePort(Optional.of(0));
@@ -129,7 +129,7 @@ public class SimpleLoadManagerImplTest {
         ServiceConfiguration config2 = new ServiceConfiguration();
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
-        config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config2.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config2.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 21e4cfc666d..d4c3fced4ae 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -60,7 +60,7 @@ public class BundleSplitterTaskTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
 
         config.setAdvertisedAddress("localhost");
         config.setBrokerShutdownTimeoutMs(0L);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index 21dd37097b4..0385574b109 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -49,7 +49,7 @@ public class AdvertisedAddressTest {
         ServiceConfiguration config = new ServiceConfiguration();
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setWebServicePort(Optional.ofNullable(0));
         config.setClusterName("usc");
         config.setAdvertisedAddress("localhost");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 57d5f0dfd50..a71eeea087d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -94,7 +94,7 @@ public class BacklogQuotaManagerTest {
 
             // start pulsar service
             config = new ServiceConfiguration();
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
             config.setClusterName("usc");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index b1db3033a8d..66ea7d9522c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -76,7 +76,7 @@ public abstract class BkEnsemblesTestBase extends 
TestRetrySupport {
             if (config == null) {
                 config = new ServiceConfiguration();
             }
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
             config.setClusterName("usc");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index a05248ac808..e575154a743 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -148,7 +148,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -301,7 +301,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -442,7 +442,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -602,7 +602,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -743,7 +743,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index e3f106893b9..d8dadaf8b5b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -97,7 +97,7 @@ public class TopicOwnerTest {
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             configurations[i] = config;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 2f462d72e48..e69d6ba630b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -71,7 +71,7 @@ public abstract class TransactionMetaStoreTestBase extends 
TestRetrySupport {
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             config.setAcknowledgmentAtBatchIndexLevelEnabled(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index 7d9c1c77b74..0d8e87d5151 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -84,7 +84,7 @@ public class ClientDeduplicationFailureTest {
         config = spy(ServiceConfiguration.class);
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 4b899f2ee0c..c5d716de011 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -112,6 +112,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        super.resetConfig();
         super.internalSetup();
         super.producerBaseSetup();
         this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
@@ -129,7 +130,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     @Test(dataProvider = "data")
     public void 
testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String 
topicType, boolean enableBatch)
             throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = topicType + "://public/default/key_shared-" + 
UUID.randomUUID();
 
         @Cleanup
@@ -155,9 +155,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     }
 
     @Test(dataProvider = "data")
-    public void testSendAndReceiveWithBatching(String topicType, boolean 
enableBatch)
-            throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
+    public void testSendAndReceiveWithBatching(String topicType, boolean 
enableBatch) throws Exception {
         String topic = topicType + "://public/default/key_shared-" + 
UUID.randomUUID();
 
         @Cleanup
@@ -203,7 +201,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "batch")
     public void 
testSendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean 
enableBatch) throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_exclusive-" + 
UUID.randomUUID();
 
         @Cleanup
@@ -253,10 +250,10 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     }
 
     @Test(dataProvider = "data")
-    public void 
testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String
 topicType,
-            boolean enableBatch) throws PulsarClientException, 
InterruptedException {
-
-        this.conf.setSubscriptionKeySharedEnable(true);
+    public void 
testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(
+        String topicType,
+        boolean enableBatch
+    ) throws PulsarClientException, InterruptedException {
         String topic = topicType + 
"://public/default/key_shared_consumer_crash-" + UUID.randomUUID();
 
         @Cleanup
@@ -297,10 +294,10 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     }
 
     @Test(dataProvider = "data")
-    public void 
testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String 
topicType,
-                                                                               
         boolean enableBatch)
-            throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
+    public void 
testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(
+        String topicType,
+        boolean enableBatch
+    ) throws PulsarClientException {
         String topic = topicType + "://public/default/key_shared_none_key-" + 
UUID.randomUUID();
 
         @Cleanup
@@ -327,7 +324,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     @Test(dataProvider = "batch")
     public void 
testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean 
enableBatch)
             throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = 
"persistent://public/default/key_shared_none_key_exclusive-" + 
UUID.randomUUID();
 
         @Cleanup
@@ -366,7 +362,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     @Test(dataProvider = "batch")
     public void 
testOrderingKeyWithHashRangeAutoSplitStickyKeyConsumerSelector(boolean 
enableBatch)
             throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_ordering_key-" 
+ UUID.randomUUID();
 
         @Cleanup
@@ -395,7 +390,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     @Test(dataProvider = "batch")
     public void 
testOrderingKeyWithHashRangeExclusiveStickyKeyConsumerSelector(boolean 
enableBatch)
             throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = 
"persistent://public/default/key_shared_exclusive_ordering_key-" + 
UUID.randomUUID();
 
         @Cleanup
@@ -446,7 +440,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
     @Test(expectedExceptions = PulsarClientException.NotAllowedException.class)
     public void testDisableKeySharedSubscription() throws 
PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(false);
+        this.conf.getSubscriptionTypesEnabled().remove("Key_Shared");
         String topic = "persistent://public/default/key_shared_disabled";
         pulsarClient.newConsumer()
             .topic(topic)
@@ -458,7 +452,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
     @Test
     public void testCannotUseAcknowledgeCumulative() throws 
PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = 
"persistent://public/default/key_shared_ack_cumulative-" + UUID.randomUUID();
 
         @Cleanup
@@ -484,7 +477,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
     @Test(dataProvider = "batch")
     public void testMakingProgressWithSlowerConsumer(boolean enableBatch) 
throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "testMakingProgressWithSlowerConsumer-" + 
UUID.randomUUID();
 
         String slowKey = "slowKey";
@@ -553,7 +545,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
     @Test
     public void testOrderingWhenAddingConsumers() throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "testOrderingWhenAddingConsumers-" + UUID.randomUUID();
 
         @Cleanup
@@ -595,7 +586,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
     @Test
     public void testReadAheadWhenAddingConsumers() throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
 
         @Cleanup
@@ -649,7 +639,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
     @Test
     public void testRemoveFirstConsumer() throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
 
         @Cleanup
@@ -707,7 +696,6 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
     @Test
     public void testHashRangeConflict() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         final String topic = 
"persistent://public/default/testHashRangeConflict-" + 
UUID.randomUUID().toString();
         final String sub = "test";
 
@@ -796,9 +784,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     }
 
     @Test
-    public void testAttachKeyToMessageMetadata()
-            throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
+    public void testAttachKeyToMessageMetadata() throws PulsarClientException {
         String topic = "persistent://public/default/key_shared-" + 
UUID.randomUUID();
 
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 95e4ecc577b..1c7ce107468 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -135,8 +135,8 @@ public class ServiceConfigurationTest {
         InputStream stream = new ByteArrayInputStream(confFile.getBytes());
         final ServiceConfiguration conf = 
PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
 
-        assertEquals(conf.getMetadataStoreUrl(), "zk1:2181");
-        assertEquals(conf.getConfigurationMetadataStoreUrl(), "zk1:2181");
+        assertEquals(conf.getMetadataStoreUrl(), "zk:zk1:2181");
+        assertEquals(conf.getConfigurationMetadataStoreUrl(), "zk:zk1:2181");
         assertEquals(conf.getBookkeeperMetadataStoreUrl(), 
"metadata-store:zk:zk1:2181/ledgers");
         assertFalse(conf.isConfigurationStoreSeparated());
         assertFalse(conf.isBookkeeperMetadataStoreSeparated());
@@ -257,6 +257,15 @@ public class ServiceConfigurationTest {
         }
     }
 
+    @Test
+    public void testSubscriptionTypesEnableWins() throws Exception {
+        final Properties properties = new Properties();
+        properties.setProperty("subscriptionKeySharedEnable", "true");
+        properties.setProperty("subscriptionTypesEnabled", 
"Exclusive,Shared,Failover");
+        final ServiceConfiguration conf = 
PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
+        assertFalse(conf.isSubscriptionKeySharedEnable());
+    }
+
     /**
      * Verify transaction batch log configuration load correct, cover these 
cases:
      *   1. broker.conf. This is default value. If the property is not 
configured in the file, the default value
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 45502165180..6a4430e2669 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -128,7 +128,7 @@ public class PulsarFunctionE2ESecurityTest {
         Set<String> superUsers = Sets.newHashSet(ADMIN_SUBJECT);
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index e810a1d7d93..2296a6890bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -209,7 +209,7 @@ public class PulsarFunctionLocalRunTest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 5d830a67778..33366fa888e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -125,7 +125,7 @@ public class PulsarFunctionPublishTest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 227088019f0..4b7d5a6cb07 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -97,7 +97,7 @@ public class PulsarFunctionTlsTest {
             config.setBrokerServicePortTls(Optional.of(brokerPort));
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
-            config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             Set<String> superUsers = Sets.newHashSet("superUser", "admin");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 9ba2ccde471..66a0b31ee98 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -92,7 +92,7 @@ public class PulsarWorkerAssignmentTest {
         final Set<String> superUsers = Sets.newHashSet("superUser", "admin");
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index da748bafea5..5009fca0195 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -117,7 +117,7 @@ public abstract class AbstractPulsarE2ETest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index afe16c63ec2..054bd4bce22 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -99,7 +99,7 @@ public class PulsarFunctionAdminTest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 2cb0e62f8e3..04467b5f204 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -113,7 +113,7 @@ public class PulsarFunctionTlsTest {
         config.setClusterName("use");
         Set<String> superUsers = Sets.newHashSet("superUser", "admin");
         config.setSuperUserRoles(superUsers);
-        config.setZookeeperServers("127.0.0.1" + ":" + 
bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         config.setAuthenticationEnabled(true);

Reply via email to