This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch 
xiangying/cherry-pick-2.10/pulsar-broker/zookeeper_read-only_config
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b78f828e3a875d9ec5928df1ee539354a1664c8d
Author: Yan Zhao <[email protected]>
AuthorDate: Sat Jan 14 18:56:05 2023 +0800

    [fix][broker] Support zookeeper read-only config. (#19156)
    
    (cherry picked from commit accae9f371f87dcf0b2a2f8d4ed277b742bfdc5f)
---
 .../java/org/apache/pulsar/broker/ServiceConfiguration.java  |  6 ++++++
 .../org/apache/pulsar/broker/resources/PulsarResources.java  | 12 ++++++++++--
 .../main/java/org/apache/pulsar/broker/PulsarService.java    |  4 ++--
 .../pulsar/websocket/proxy/ProxyAuthenticationTest.java      |  4 +++-
 .../pulsar/websocket/proxy/ProxyAuthorizationTest.java       |  4 +++-
 .../pulsar/websocket/proxy/ProxyConfigurationTest.java       |  4 +++-
 .../pulsar/websocket/proxy/ProxyPublishConsumeTest.java      |  4 +++-
 .../pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java   |  4 +++-
 .../websocket/proxy/ProxyPublishConsumeWithoutZKTest.java    |  4 +++-
 .../websocket/proxy/v1/V1_ProxyAuthenticationTest.java       |  4 +++-
 .../org/apache/pulsar/functions/worker/WorkerConfig.java     |  6 ++++++
 .../main/java/org/apache/pulsar/functions/worker/Worker.java |  3 ++-
 .../org/apache/pulsar/proxy/server/ProxyConfiguration.java   |  6 ++++++
 .../java/org/apache/pulsar/proxy/server/ProxyService.java    |  8 +++++---
 .../java/org/apache/pulsar/websocket/WebSocketService.java   | 10 ++++++----
 15 files changed, 64 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b0a8581aac9..6337d3cf7e4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -417,6 +417,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         )
     private int zooKeeperCacheExpirySeconds = -1;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Is zookeeper allow read-only operations."
+    )
+    private boolean zooKeeperAllowReadOnlyOperations;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a087d8090d3..3c1a8fb41df 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -88,9 +88,17 @@ public class PulsarResources {
         this.configurationMetadataStore = 
Optional.ofNullable(configurationMetadataStore);
     }
 
-    public static MetadataStoreExtended createMetadataStore(String serverUrls, 
int sessionTimeoutMs)
+    public static MetadataStoreExtended createMetadataStore(String serverUrls, 
int sessionTimeoutMs,
+                                                            boolean 
allowReadOnlyOperations)
             throws MetadataStoreException {
         return MetadataStoreExtended.create(serverUrls, 
MetadataStoreConfig.builder()
-                
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
+                
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build());
+    }
+
+    public static MetadataStoreExtended createConfigMetadataStore(String 
serverUrls, int sessionTimeoutMs,
+                                                                  boolean 
allowReadOnlyOperations)
+            throws MetadataStoreException {
+        return MetadataStoreExtended.create(serverUrls, 
MetadataStoreConfig.builder()
+                
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build());
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2bda5c55398..e2034bdc60b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -347,7 +347,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return 
MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) 
config.getMetadataStoreSessionTimeoutMillis())
-                        .allowReadOnlyOperations(false)
+                        
.allowReadOnlyOperations(config.isZooKeeperAllowReadOnlyOperations())
                         .configFilePath(config.getMetadataStoreConfigPath())
                         
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
                         
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
@@ -980,7 +980,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) 
config.getMetadataStoreSessionTimeoutMillis())
-                        .allowReadOnlyOperations(false)
+                        
.allowReadOnlyOperations(config.isZooKeeperAllowReadOnlyOperations())
                         .configFilePath(config.getMetadataStoreConfigPath())
                         
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
                         
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index a34ec879ba6..49ba21ce388 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -83,7 +84,8 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
         }
 
         service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
 anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index 7e0ee1bd466..a62f4751459 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.websocket.proxy;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -64,7 +65,8 @@ public class ProxyAuthorizationTest extends 
MockedPulsarServiceBaseTest {
         config.setWebServicePort(Optional.of(0));
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
 anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
         service.start();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 26cf8a0e154..8ff00cf0ce9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.websocket.proxy;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -67,7 +68,8 @@ public class ProxyConfigurationTest extends 
ProducerConsumerBase {
         config.setServiceUrl("http://localhost:8080";);
         config.getProperties().setProperty("brokerClient_lookupTimeoutMs", 
"100");
         WebSocketService service = 
spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
 anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
         service.start();
 
         PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 918640642ec..02d9ceb1c1a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -101,7 +102,8 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
         config.setClusterName("test");
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
 anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index a8b67416107..b98414a9270 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -74,7 +75,8 @@ public class ProxyPublishConsumeTlsTest extends 
TlsProducerConsumerBase {
         
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
 anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 1fb12645e5e..dadc246c98e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -62,7 +63,8 @@ public class ProxyPublishConsumeWithoutZKTest extends 
ProducerConsumerBase {
         config.setServiceUrl(pulsar.getSafeWebServiceAddress());
         config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
         service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
-        doReturn(new 
ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(), 
anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeper)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index b80c3fb07be..af9b00e5921 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy.v1;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -85,7 +86,8 @@ public class V1_ProxyAuthenticationTest extends 
V1_ProducerConsumerBase {
         }
 
         service = spyWithClassAndConstructorArgs(WebSocketService.class, 
config);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
 anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index a55f49bfc20..27a6e8eea51 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -205,6 +205,12 @@ public class WorkerConfig implements Serializable, 
PulsarConfiguration {
     )
     private int zooKeeperCacheExpirySeconds = -1;
 
+    @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "Is zooKeeper allow read-only operations."
+    )
+    private boolean zooKeeperAllowReadOnlyOperations;
+
     @FieldContext(
         category = CATEGORY_CONNECTORS,
         doc = "The path to the location to locate builtin connectors"
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index f8871cb86e7..189daae348f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -75,7 +75,8 @@ public class Worker {
             try {
                 configMetadataStore = PulsarResources.createMetadataStore(
                         workerConfig.getConfigurationMetadataStoreUrl(),
-                        (int) 
workerConfig.getMetadataStoreSessionTimeoutMillis());
+                        (int) 
workerConfig.getMetadataStoreSessionTimeoutMillis(),
+                        workerConfig.isZooKeeperAllowReadOnlyOperations());
             } catch (IOException e) {
                 throw new PulsarServerException(e);
             }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index d139bb83002..a3183eb749c 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -148,6 +148,12 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
     )
     private int zooKeeperCacheExpirySeconds = -1;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Is zooKeeper allow read-only operations."
+    )
+    private boolean zooKeeperAllowReadOnlyOperations;
+
     @FieldContext(
         category = CATEGORY_BROKER_DISCOVERY,
         doc = "The service url points to the broker cluster"
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 1960b5143a0..c29e2ba1696 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -427,12 +427,14 @@ public class ProxyService implements Closeable {
 
     public MetadataStoreExtended createLocalMetadataStore() throws 
MetadataStoreException {
         return 
PulsarResources.createMetadataStore(proxyConfig.getMetadataStoreUrl(),
-                proxyConfig.getMetadataStoreSessionTimeoutMillis());
+                proxyConfig.getMetadataStoreSessionTimeoutMillis(),
+                proxyConfig.isZooKeeperAllowReadOnlyOperations());
     }
 
     public MetadataStoreExtended createConfigurationMetadataStore() throws 
MetadataStoreException {
-        return 
PulsarResources.createMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(),
-                proxyConfig.getMetadataStoreSessionTimeoutMillis());
+        return 
PulsarResources.createConfigMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(),
+                proxyConfig.getMetadataStoreSessionTimeoutMillis(),
+                proxyConfig.isZooKeeperAllowReadOnlyOperations());
     }
 
     public Authentication getProxyClientAuthenticationPlugin() {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index a57c6c491e7..09be1b70264 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -102,8 +102,9 @@ public class WebSocketService implements Closeable {
 
         if (isNotBlank(config.getConfigurationMetadataStoreUrl())) {
             try {
-                configMetadataStore = 
createMetadataStore(config.getConfigurationMetadataStoreUrl(),
-                        (int) config.getMetadataStoreSessionTimeoutMillis());
+                configMetadataStore = 
createConfigMetadataStore(config.getConfigurationMetadataStoreUrl(),
+                        (int) config.getMetadataStoreSessionTimeoutMillis(),
+                        config.isZooKeeperAllowReadOnlyOperations());
             } catch (MetadataStoreException e) {
                 throw new PulsarServerException(e);
             }
@@ -123,9 +124,10 @@ public class WebSocketService implements Closeable {
         log.info("Pulsar WebSocket Service started");
     }
 
-    public MetadataStoreExtended createMetadataStore(String serverUrls, int 
sessionTimeoutMs)
+    public MetadataStoreExtended createConfigMetadataStore(String serverUrls, 
int sessionTimeoutMs, boolean
+            isAllowReadOnlyOperations)
             throws MetadataStoreException {
-        return PulsarResources.createMetadataStore(serverUrls, 
sessionTimeoutMs);
+        return PulsarResources.createConfigMetadataStore(serverUrls, 
sessionTimeoutMs, isAllowReadOnlyOperations);
     }
 
     @Override

Reply via email to