This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 0d2fa2af2fc [fix][branch-2.9] Support zookeeper read-only config
(#19693)
0d2fa2af2fc is described below
commit 0d2fa2af2fce3a3253e4e569c9655c8d2d855db3
Author: Yan Zhao <[email protected]>
AuthorDate: Fri Mar 3 16:15:18 2023 +0800
[fix][branch-2.9] Support zookeeper read-only config (#19693)
Cherry-pick from #19156
---
conf/broker.conf | 3 +++
conf/proxy.conf | 3 +++
.../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 5 +++++
.../java/org/apache/pulsar/broker/resources/PulsarResources.java | 5 +++--
.../src/main/java/org/apache/pulsar/broker/PulsarService.java | 4 ++--
.../org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java | 5 +++--
.../org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java | 4 +++-
.../org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java | 4 +++-
.../org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java | 4 +++-
.../test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java | 4 +++-
.../org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java | 4 +++-
.../apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java | 4 +++-
.../pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java | 4 +++-
.../pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java | 4 +++-
.../main/java/org/apache/pulsar/functions/worker/WorkerConfig.java | 5 +++++
.../src/main/java/org/apache/pulsar/functions/worker/Worker.java | 3 ++-
.../java/org/apache/pulsar/proxy/server/ProxyConfiguration.java | 6 ++++++
.../src/main/java/org/apache/pulsar/proxy/server/ProxyService.java | 4 ++--
.../main/java/org/apache/pulsar/websocket/WebSocketService.java | 7 ++++---
19 files changed, 62 insertions(+), 20 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index c6901ebebc7..4ecdba5bb02 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -116,6 +116,9 @@ zooKeeperOperationTimeoutSeconds=30
# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300
+# Is zookeeper allow read-only operations.
+zookeeperAllowReadOnlyOperations=false
+
# Time to wait for broker graceful shutdown. After this time elapses, the
process will be killed
brokerShutdownTimeoutMs=60000
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 77ab31b80cf..9186a35f164 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -44,6 +44,9 @@ zookeeperSessionTimeoutMs=30000
# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300
+# Is zookeeper allow read-only operations.
+zookeeperAllowReadOnlyOperations=false
+
### --- Server --- ###
# Hostname or IP address the service binds on, default is 0.0.0.0.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 458511a28fc..d4912ae9fb2 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -348,6 +348,11 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Is zookeeper allow read-only operations"
+ )
+ private boolean zookeeperAllowReadOnlyOperations = false;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a01d57817ee..f8d6ba43045 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -91,9 +91,10 @@ public class PulsarResources {
this.configurationMetadataStore =
Optional.ofNullable(configurationMetadataStore);
}
- public static MetadataStoreExtended createMetadataStore(String serverUrls,
int sessionTimeoutMs)
+ public static MetadataStoreExtended createMetadataStore(String serverUrls,
int sessionTimeoutMs,
+ boolean
allowReadOnlyOperations)
throws MetadataStoreException {
return MetadataStoreExtended.create(serverUrls,
MetadataStoreConfig.builder()
-
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
+
.sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build());
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 052ee90b536..38e8f34ab46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -352,7 +352,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return
MetadataStoreFactory.create(config.getConfigurationStoreServers(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int)
config.getZooKeeperSessionTimeoutMillis())
- .allowReadOnlyOperations(false)
+
.allowReadOnlyOperations(config.isZookeeperAllowReadOnlyOperations())
.build());
}
@@ -963,7 +963,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
return MetadataStoreExtended.create(config.getZookeeperServers(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int)
config.getZooKeeperSessionTimeoutMillis())
- .allowReadOnlyOperations(false)
+
.allowReadOnlyOperations(config.isZookeeperAllowReadOnlyOperations())
.build());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index b848fa76d54..d0fdfbdb019 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -88,7 +88,8 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index a2758b72a4e..984cec472a0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.websocket.proxy;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -70,7 +71,8 @@ public class ProxyAuthorizationTest extends
MockedPulsarServiceBaseTest {
config.setWebServicePort(Optional.of(0));
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 184f86340fa..a9687411bd2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.websocket.proxy;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -70,7 +71,8 @@ public class ProxyConfigurationTest extends
ProducerConsumerBase {
config.setServiceUrl("http://localhost:8080");
config.getProperties().setProperty("brokerClient_lookupTimeoutMs",
"100");
WebSocketService service =
spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
index 71f8c0b6d86..738c9d898f9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -64,7 +65,8 @@ public class ProxyIdleTimeoutTest extends
ProducerConsumerBase {
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
index 6e0dfa46cfd..831b09660f4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -66,7 +67,8 @@ public class ProxyPingTest extends ProducerConsumerBase {
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
config.setWebSocketPingDurationSeconds(2);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 1e74cdae787..11a0384cf5a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -105,7 +106,8 @@ public class ProxyPublishConsumeTest extends
ProducerConsumerBase {
config.setClusterName("test");
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index cdc2eb58d9a..5e7956599ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -78,7 +79,8 @@ public class ProxyPublishConsumeTlsTest extends
TlsProducerConsumerBase {
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 5baaacd52d9..c624e57ffb2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -65,7 +66,8 @@ public class ProxyPublishConsumeWithoutZKTest extends
ProducerConsumerBase {
config.setServiceUrl(pulsar.getSafeWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeper)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index 03227e9587d..627f9db3423 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy.v1;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -88,7 +89,8 @@ public class V1_ProxyAuthenticationTest extends
V1_ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class,
config);
- doReturn(new
ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(),
anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index e51e5dfb742..25f6eb1d98f 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -163,6 +163,11 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Is zookeeper allow read-only operations"
+ )
+ private boolean zookeeperAllowReadOnlyOperations = false;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index df3ce761fce..e9944ac95e4 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -77,7 +77,8 @@ public class Worker {
log.info("starting configuration cache service");
try {
configMetadataStore =
PulsarResources.createMetadataStore(workerConfig.getConfigurationStoreServers(),
- (int) workerConfig.getZooKeeperSessionTimeoutMillis());
+ (int) workerConfig.getZooKeeperSessionTimeoutMillis(),
+ workerConfig.isZookeeperAllowReadOnlyOperations());
} catch (IOException e) {
throw new PulsarServerException(e);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e75db662a75..12ead928f5e 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -104,6 +104,12 @@ public class ProxyConfiguration implements
PulsarConfiguration {
)
private int zooKeeperCacheExpirySeconds = 300;
+ @FieldContext(
+ category = CATEGORY_BROKER_DISCOVERY,
+ doc = "Is zookeeper allow read-only operations"
+ )
+ private boolean zookeeperAllowReadOnlyOperations = false;
+
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The service url points to the broker cluster"
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 10e122e794d..6b11a3e805a 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -378,12 +378,12 @@ public class ProxyService implements Closeable {
public MetadataStoreExtended createLocalMetadataStore() throws
MetadataStoreException {
return
PulsarResources.createMetadataStore(proxyConfig.getZookeeperServers(),
- proxyConfig.getZookeeperSessionTimeoutMs());
+ proxyConfig.getZookeeperSessionTimeoutMs(),
proxyConfig.isZookeeperAllowReadOnlyOperations());
}
public MetadataStoreExtended createConfigurationMetadataStore() throws
MetadataStoreException {
return
PulsarResources.createMetadataStore(proxyConfig.getConfigurationStoreServers(),
- proxyConfig.getZookeeperSessionTimeoutMs());
+ proxyConfig.getZookeeperSessionTimeoutMs(),
proxyConfig.isZookeeperAllowReadOnlyOperations());
}
public Authentication getProxyClientAuthenticationPlugin() {
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 843e3a8ec82..929ee0826b0 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -103,7 +103,7 @@ public class WebSocketService implements Closeable {
if (isNotBlank(config.getConfigurationStoreServers())) {
try {
configMetadataStore =
createMetadataStore(config.getConfigurationStoreServers(),
- (int) config.getZooKeeperSessionTimeoutMillis());
+ (int) config.getZooKeeperSessionTimeoutMillis(),
config.isZookeeperAllowReadOnlyOperations());
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
}
@@ -123,9 +123,10 @@ public class WebSocketService implements Closeable {
log.info("Pulsar WebSocket Service started");
}
- public MetadataStoreExtended createMetadataStore(String serverUrls, int
sessionTimeoutMs)
+ public MetadataStoreExtended createMetadataStore(String serverUrls, int
sessionTimeoutMs,
+ boolean
isAllowReadOnlyOperations)
throws MetadataStoreException {
- return PulsarResources.createMetadataStore(serverUrls,
sessionTimeoutMs);
+ return PulsarResources.createMetadataStore(serverUrls,
sessionTimeoutMs, isAllowReadOnlyOperations);
}
@Override