This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch xiangying/cherry-pick-2.10/pulsar-broker/zookeeper_read-only_config in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b78f828e3a875d9ec5928df1ee539354a1664c8d Author: Yan Zhao <[email protected]> AuthorDate: Sat Jan 14 18:56:05 2023 +0800 [fix][broker] Support zookeeper read-only config. (#19156) (cherry picked from commit accae9f371f87dcf0b2a2f8d4ed277b742bfdc5f) --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++ .../org/apache/pulsar/broker/resources/PulsarResources.java | 12 ++++++++++-- .../main/java/org/apache/pulsar/broker/PulsarService.java | 4 ++-- .../pulsar/websocket/proxy/ProxyAuthenticationTest.java | 4 +++- .../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 4 +++- .../pulsar/websocket/proxy/ProxyConfigurationTest.java | 4 +++- .../pulsar/websocket/proxy/ProxyPublishConsumeTest.java | 4 +++- .../pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java | 4 +++- .../websocket/proxy/ProxyPublishConsumeWithoutZKTest.java | 4 +++- .../websocket/proxy/v1/V1_ProxyAuthenticationTest.java | 4 +++- .../org/apache/pulsar/functions/worker/WorkerConfig.java | 6 ++++++ .../main/java/org/apache/pulsar/functions/worker/Worker.java | 3 ++- .../org/apache/pulsar/proxy/server/ProxyConfiguration.java | 6 ++++++ .../java/org/apache/pulsar/proxy/server/ProxyService.java | 8 +++++--- .../java/org/apache/pulsar/websocket/WebSocketService.java | 10 ++++++---- 15 files changed, 64 insertions(+), 19 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b0a8581aac9..6337d3cf7e4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -417,6 +417,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int zooKeeperCacheExpirySeconds = -1; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Is zookeeper allow read-only operations." + ) + private boolean zooKeeperAllowReadOnlyOperations; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index a087d8090d3..3c1a8fb41df 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -88,9 +88,17 @@ public class PulsarResources { this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore); } - public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs) + public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs, + boolean allowReadOnlyOperations) throws MetadataStoreException { return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder() - .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build()); + .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build()); + } + + public static MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs, + boolean allowReadOnlyOperations) + throws MetadataStoreException { + return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder() + .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 2bda5c55398..e2034bdc60b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -347,7 +347,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { return MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(), MetadataStoreConfig.builder() .sessionTimeoutMillis((int) config.getMetadataStoreSessionTimeoutMillis()) - .allowReadOnlyOperations(false) + .allowReadOnlyOperations(config.isZooKeeperAllowReadOnlyOperations()) .configFilePath(config.getMetadataStoreConfigPath()) .batchingEnabled(config.isMetadataStoreBatchingEnabled()) .batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis()) @@ -980,7 +980,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { return MetadataStoreExtended.create(config.getMetadataStoreUrl(), MetadataStoreConfig.builder() .sessionTimeoutMillis((int) config.getMetadataStoreSessionTimeoutMillis()) - .allowReadOnlyOperations(false) + .allowReadOnlyOperations(config.isZooKeeperAllowReadOnlyOperations()) .configFilePath(config.getMetadataStoreConfigPath()) .batchingEnabled(config.isMetadataStoreBatchingEnabled()) .batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java index a34ec879ba6..49ba21ce388 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy; import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -83,7 +84,8 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { } service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index 7e0ee1bd466..a62f4751459 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.websocket.proxy; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -64,7 +65,8 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { config.setWebServicePort(Optional.of(0)); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); service.start(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java index 26cf8a0e154..8ff00cf0ce9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.websocket.proxy; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -67,7 +68,8 @@ public class ProxyConfigurationTest extends ProducerConsumerBase { config.setServiceUrl("http://localhost:8080"); config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100"); WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); service.start(); PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 918640642ec..02d9ceb1c1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy; import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -101,7 +102,8 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { config.setClusterName("test"); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index a8b67416107..b98414a9270 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy; import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -74,7 +75,8 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase { config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index 1fb12645e5e..dadc246c98e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy; import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -62,7 +63,8 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase { config.setServiceUrl(pulsar.getSafeWebServiceAddress()); config.setServiceUrlTls(pulsar.getWebServiceAddressTls()); service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeper)).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java index b80c3fb07be..af9b00e5921 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java @@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy.v1; import static java.util.concurrent.Executors.newFixedThreadPool; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -85,7 +86,8 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase { } service = spyWithClassAndConstructorArgs(WebSocketService.class, config); - doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt()); + doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service) + .createConfigMetadataStore(anyString(), anyInt(), anyBoolean()); proxyServer = new ProxyServer(config); WebSocketServiceStarter.start(proxyServer, service); log.info("Proxy Server Started"); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index a55f49bfc20..27a6e8eea51 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -205,6 +205,12 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { ) private int zooKeeperCacheExpirySeconds = -1; + @FieldContext( + category = CATEGORY_WORKER, + doc = "Is zooKeeper allow read-only operations." + ) + private boolean zooKeeperAllowReadOnlyOperations; + @FieldContext( category = CATEGORY_CONNECTORS, doc = "The path to the location to locate builtin connectors" diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index f8871cb86e7..189daae348f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -75,7 +75,8 @@ public class Worker { try { configMetadataStore = PulsarResources.createMetadataStore( workerConfig.getConfigurationMetadataStoreUrl(), - (int) workerConfig.getMetadataStoreSessionTimeoutMillis()); + (int) workerConfig.getMetadataStoreSessionTimeoutMillis(), + workerConfig.isZooKeeperAllowReadOnlyOperations()); } catch (IOException e) { throw new PulsarServerException(e); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index d139bb83002..a3183eb749c 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -148,6 +148,12 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private int zooKeeperCacheExpirySeconds = -1; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Is zooKeeper allow read-only operations." + ) + private boolean zooKeeperAllowReadOnlyOperations; + @FieldContext( category = CATEGORY_BROKER_DISCOVERY, doc = "The service url points to the broker cluster" diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 1960b5143a0..c29e2ba1696 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -427,12 +427,14 @@ public class ProxyService implements Closeable { public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException { return PulsarResources.createMetadataStore(proxyConfig.getMetadataStoreUrl(), - proxyConfig.getMetadataStoreSessionTimeoutMillis()); + proxyConfig.getMetadataStoreSessionTimeoutMillis(), + proxyConfig.isZooKeeperAllowReadOnlyOperations()); } public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException { - return PulsarResources.createMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(), - proxyConfig.getMetadataStoreSessionTimeoutMillis()); + return PulsarResources.createConfigMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(), + proxyConfig.getMetadataStoreSessionTimeoutMillis(), + proxyConfig.isZooKeeperAllowReadOnlyOperations()); } public Authentication getProxyClientAuthenticationPlugin() { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index a57c6c491e7..09be1b70264 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -102,8 +102,9 @@ public class WebSocketService implements Closeable { if (isNotBlank(config.getConfigurationMetadataStoreUrl())) { try { - configMetadataStore = createMetadataStore(config.getConfigurationMetadataStoreUrl(), - (int) config.getMetadataStoreSessionTimeoutMillis()); + configMetadataStore = createConfigMetadataStore(config.getConfigurationMetadataStoreUrl(), + (int) config.getMetadataStoreSessionTimeoutMillis(), + config.isZooKeeperAllowReadOnlyOperations()); } catch (MetadataStoreException e) { throw new PulsarServerException(e); } @@ -123,9 +124,10 @@ public class WebSocketService implements Closeable { log.info("Pulsar WebSocket Service started"); } - public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs) + public MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs, boolean + isAllowReadOnlyOperations) throws MetadataStoreException { - return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs); + return PulsarResources.createConfigMetadataStore(serverUrls, sessionTimeoutMs, isAllowReadOnlyOperations); } @Override
