This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 757b96d5cf3299e6a1b5994f8a7ecc1811aaf028
Author: Kai Wang <[email protected]>
AuthorDate: Mon Aug 5 10:07:10 2024 +0800

    [improve][pip] PIP-366: Support to specify different config for 
Configuration and Local Metadata Store (#23041)
    
    (cherry picked from commit 76f16e811beb4f48fb2ae5c46558b74d333c7d60)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  8 ++-
 .../org/apache/pulsar/broker/PulsarService.java    |  6 ++-
 .../metadata/impl/PulsarZooKeeperClient.java       | 53 +++++++++++++------
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  2 +
 .../metadata/impl/oxia/OxiaMetadataStore.java      | 21 ++++----
 .../pulsar/metadata/BaseMetadataStoreTest.java     |  2 +-
 .../apache/pulsar/metadata/MetadataStoreTest.java  | 60 +++++++++++++++++++++-
 .../src/test/resources/oxia_client.conf            | 20 ++++++++
 .../test/resources/zk_client_disabled_sasl.conf    | 20 ++++++++
 9 files changed, 164 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 165dae7e12d..4f6525309ba 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -562,10 +562,16 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
     @FieldContext(
             category = CATEGORY_SERVER,
-            doc = "Configuration file path for local metadata store. It's 
supported by RocksdbMetadataStore for now."
+            doc = "Configuration file path for local metadata store."
     )
     private String metadataStoreConfigPath = null;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Configuration file path for configuration metadata store."
+    )
+    private String configurationStoreConfigPath = null;
+
     @FieldContext(
             dynamic = true,
             category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c9c8407a13d..60c37c0925b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -379,11 +379,15 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
     public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
             throws MetadataStoreException {
+        String configFilePath = config.getMetadataStoreConfigPath();
+        if (StringUtils.isNotBlank(config.getConfigurationStoreConfigPath())) {
+            configFilePath = config.getConfigurationStoreConfigPath();
+        }
         return 
MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) 
config.getMetadataStoreSessionTimeoutMillis())
                         
.allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations())
-                        .configFilePath(config.getMetadataStoreConfigPath())
+                        .configFilePath(configFilePath)
                         
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
                         
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
                         
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index cc29b615c11..e8bfb39395a 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -61,8 +61,10 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
 /**
  * Provide a zookeeper client to handle session expire.
@@ -92,6 +94,9 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
     private final RetryPolicy connectRetryPolicy;
     private final RetryPolicy operationRetryPolicy;
 
+    // Zookeeper config path
+    private final String configPath;
+
     // Stats Logger
     private final OpStatsLogger createStats;
     private final OpStatsLogger getStats;
@@ -120,8 +125,9 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
                         ZooKeeper newZk;
                         try {
                             newZk = createZooKeeper();
-                        } catch (IOException ie) {
-                            log.error("Failed to create zookeeper instance to 
" + connectString, ie);
+                        } catch (IOException | 
QuorumPeerConfig.ConfigException e) {
+                            log.error("Failed to create zookeeper instance to 
{} with config path {}",
+                                    connectString, configPath, e);
                             throw 
KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
                         }
                         waitForConnection();
@@ -149,7 +155,7 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
     static PulsarZooKeeperClient createConnectedZooKeeperClient(
             String connectString, int sessionTimeoutMs, Set<Watcher> 
childWatchers,
             RetryPolicy operationRetryPolicy)
-            throws KeeperException, InterruptedException, IOException {
+            throws KeeperException, InterruptedException, IOException, 
QuorumPeerConfig.ConfigException {
         return PulsarZooKeeperClient.newBuilder()
                 .connectString(connectString)
                 .sessionTimeoutMs(sessionTimeoutMs)
@@ -171,6 +177,7 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
         int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
         double requestRateLimit = 0;
         boolean allowReadOnlyMode = false;
+        String configPath = null;
 
         private Builder() {}
 
@@ -219,7 +226,15 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
             return this;
         }
 
-        public PulsarZooKeeperClient build() throws IOException, 
KeeperException, InterruptedException {
+        public Builder configPath(String configPath) {
+            this.configPath = configPath;
+            return this;
+        }
+
+        public PulsarZooKeeperClient build() throws IOException,
+                KeeperException,
+                InterruptedException,
+                QuorumPeerConfig.ConfigException {
             requireNonNull(connectString);
             checkArgument(sessionTimeoutMs > 0);
             requireNonNull(statsLogger);
@@ -251,7 +266,8 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
                     statsLogger,
                     retryExecThreadCount,
                     requestRateLimit,
-                    allowReadOnlyMode
+                    allowReadOnlyMode,
+                    configPath
             );
             // Wait for connection to be established.
             try {
@@ -273,16 +289,19 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
     }
 
     protected PulsarZooKeeperClient(String connectString,
-                              int sessionTimeoutMs,
-                              ZooKeeperWatcherBase watcherManager,
-                              RetryPolicy connectRetryPolicy,
-                              RetryPolicy operationRetryPolicy,
-                              StatsLogger statsLogger,
-                              int retryExecThreadCount,
-                              double rate,
-                              boolean allowReadOnlyMode) throws IOException {
-        super(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode);
+                                    int sessionTimeoutMs,
+                                    ZooKeeperWatcherBase watcherManager,
+                                    RetryPolicy connectRetryPolicy,
+                                    RetryPolicy operationRetryPolicy,
+                                    StatsLogger statsLogger,
+                                    int retryExecThreadCount,
+                                    double rate,
+                                    boolean allowReadOnlyMode,
+                                    String configPath) throws IOException, 
QuorumPeerConfig.ConfigException {
+        super(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode,
+                configPath == null ? null : new ZKClientConfig(configPath));
         this.connectString = connectString;
+        this.configPath = configPath;
         this.sessionTimeoutMs = sessionTimeoutMs;
         this.allowReadOnlyMode =  allowReadOnlyMode;
         this.watcherManager = watcherManager;
@@ -334,7 +353,11 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
         watcherManager.waitForConnection();
     }
 
-    protected ZooKeeper createZooKeeper() throws IOException {
+    protected ZooKeeper createZooKeeper() throws IOException, 
QuorumPeerConfig.ConfigException {
+        if (null != configPath) {
+            return new ZooKeeper(connectString, sessionTimeoutMs, 
watcherManager, allowReadOnlyMode,
+                    new ZKClientConfig(configPath));
+        }
         return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode);
     }
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 2e88cb33324..603a4503dc8 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -100,6 +100,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                     
.allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations())
                     
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
                     
.watchers(Collections.singleton(this::processSessionWatcher))
+                    .configPath(metadataStoreConfig.getConfigFilePath())
                     .build();
             if (enableSessionWatcher) {
                 sessionWatcher = new ZKSessionWatcher(zkc, 
this::receivedSessionEvent);
@@ -577,6 +578,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                     .connectRetryPolicy(
                             new 
BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
                                     
metadataStoreConfig.getSessionTimeoutMillis(), 0))
+                    .configPath(metadataStoreConfig.getConfigFilePath())
                     .build()) {
                 if (chrootZk.exists(chrootPath, false) == null) {
                     createFullPathOptimistic(chrootZk, chrootPath, new 
byte[0], CreateMode.PERSISTENT);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index f85e3d2dc75..38bf7265feb 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -40,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -71,15 +72,17 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
         }
         updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer());
         identity = UUID.randomUUID().toString();
-        client =
-                OxiaClientBuilder.create(serviceAddress)
-                        .clientIdentifier(identity)
-                        .namespace(namespace)
-                        
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
-                        .batchLinger(Duration.ofMillis(linger))
-                        
.maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations())
-                        .asyncClient()
-                        .get();
+        OxiaClientBuilder oxiaClientBuilder = OxiaClientBuilder
+                .create(serviceAddress)
+                .clientIdentifier(identity)
+                .namespace(namespace)
+                
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
+                .batchLinger(Duration.ofMillis(linger))
+                
.maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations());
+        if (StringUtils.isNotBlank(metadataStoreConfig.getConfigFilePath())) {
+            
oxiaClientBuilder.loadConfig(metadataStoreConfig.getConfigFilePath());
+        }
+        client = oxiaClientBuilder.asyncClient().get();
         client.notifications(this::notificationCallback);
         
super.registerSyncListener(Optional.ofNullable(metadataStoreConfig.getSynchronizer()));
     }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 491e3d0b964..c77de92ae3c 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -100,7 +100,7 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
         };
     }
 
-    private synchronized String getOxiaServerConnectString() {
+    protected synchronized String getOxiaServerConnectString() {
         if (oxiaServer == null) {
             oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME);
             oxiaServer.start();
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index b1578188c68..2c589dfd482 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -24,9 +24,11 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -36,7 +38,13 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
+
+import io.streamnative.oxia.client.ClientConfig;
+import io.streamnative.oxia.client.api.AsyncOxiaClient;
+import io.streamnative.oxia.client.session.SessionFactory;
+import io.streamnative.oxia.client.session.SessionManager;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -51,9 +59,15 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
 import org.assertj.core.util.Lists;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -457,7 +471,8 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         MetadataStoreConfig config = builder.build();
         @Cleanup
         ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
-
+        ZooKeeper zkClient = store.getZkClient();
+        assertTrue(zkClient.getClientConfig().isSaslClientEnabled());
         final Runnable verify = () -> {
             String currentThreadName = Thread.currentThread().getName();
             String errorMessage = String.format("Expect to switch to thread 
%s, but currently it is thread %s",
@@ -500,6 +515,49 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         }).join();
     }
 
+    @Test
+    public void testZkLoadConfigFromFile() throws Exception {
+        final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+        MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+                
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+        builder.fsyncEnable(false);
+        builder.batchingEnabled(true);
+        
builder.configFilePath("src/test/resources/zk_client_disabled_sasl.conf");
+        MetadataStoreConfig config = builder.build();
+        @Cleanup
+        ZKMetadataStore store = (ZKMetadataStore) 
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+        PulsarZooKeeperClient zkClient = (PulsarZooKeeperClient) 
store.getZkClient();
+        assertFalse(zkClient.getClientConfig().isSaslClientEnabled());
+
+        zkClient.process(new WatchedEvent(Watcher.Event.EventType.None, 
Watcher.Event.KeeperState.Expired, null));
+
+        var zooKeeperRef = (AtomicReference<ZooKeeper>) 
WhiteboxImpl.getInternalState(zkClient, "zk");
+        var zooKeeper = Awaitility.await().until(zooKeeperRef::get, 
Objects::nonNull);
+        assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled());
+    }
+
+    @Test
+    public void testOxiaLoadConfigFromFile() throws Exception {
+        final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");
+        String oxia = "oxia://" + getOxiaServerConnectString();
+        MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+                
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+        builder.fsyncEnable(false);
+        builder.batchingEnabled(true);
+        builder.sessionTimeoutMillis(30000);
+        builder.configFilePath("src/test/resources/oxia_client.conf");
+        MetadataStoreConfig config = builder.build();
+
+        OxiaMetadataStore store = (OxiaMetadataStore) 
MetadataStoreFactory.create(oxia, config);
+        var client = (AsyncOxiaClient) WhiteboxImpl.getInternalState(store, 
"client");
+        var sessionManager = (SessionManager) 
WhiteboxImpl.getInternalState(client, "sessionManager");
+        var sessionFactory = (SessionFactory) 
WhiteboxImpl.getInternalState(sessionManager, "factory");
+        var clientConfig = (ClientConfig) 
WhiteboxImpl.getInternalState(sessionFactory, "config");
+        var sessionTimeout = clientConfig.sessionTimeout();
+        assertEquals(sessionTimeout, Duration.ofSeconds(60));
+    }
+
     @Test(dataProvider = "impl")
     public void testPersistent(String provider, Supplier<String> urlSupplier) 
throws Exception {
         String metadataUrl = urlSupplier.get();
diff --git a/pulsar-metadata/src/test/resources/oxia_client.conf 
b/pulsar-metadata/src/test/resources/oxia_client.conf
new file mode 100644
index 00000000000..3e92f05a340
--- /dev/null
+++ b/pulsar-metadata/src/test/resources/oxia_client.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+sessionTimeout=60000
diff --git a/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf 
b/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf
new file mode 100644
index 00000000000..9e0f6e8fd0f
--- /dev/null
+++ b/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+zookeeper.sasl.client=false

Reply via email to