This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 757b96d5cf3299e6a1b5994f8a7ecc1811aaf028 Author: Kai Wang <[email protected]> AuthorDate: Mon Aug 5 10:07:10 2024 +0800 [improve][pip] PIP-366: Support to specify different config for Configuration and Local Metadata Store (#23041) (cherry picked from commit 76f16e811beb4f48fb2ae5c46558b74d333c7d60) --- .../apache/pulsar/broker/ServiceConfiguration.java | 8 ++- .../org/apache/pulsar/broker/PulsarService.java | 6 ++- .../metadata/impl/PulsarZooKeeperClient.java | 53 +++++++++++++------ .../pulsar/metadata/impl/ZKMetadataStore.java | 2 + .../metadata/impl/oxia/OxiaMetadataStore.java | 21 ++++---- .../pulsar/metadata/BaseMetadataStoreTest.java | 2 +- .../apache/pulsar/metadata/MetadataStoreTest.java | 60 +++++++++++++++++++++- .../src/test/resources/oxia_client.conf | 20 ++++++++ .../test/resources/zk_client_disabled_sasl.conf | 20 ++++++++ 9 files changed, 164 insertions(+), 28 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 165dae7e12d..4f6525309ba 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -562,10 +562,16 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_SERVER, - doc = "Configuration file path for local metadata store. It's supported by RocksdbMetadataStore for now." + doc = "Configuration file path for local metadata store." ) private String metadataStoreConfigPath = null; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Configuration file path for configuration metadata store." + ) + private String configurationStoreConfigPath = null; + @FieldContext( dynamic = true, category = CATEGORY_SERVER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c9c8407a13d..60c37c0925b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -379,11 +379,15 @@ public class PulsarService implements AutoCloseable, ShutdownService { public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) throws MetadataStoreException { + String configFilePath = config.getMetadataStoreConfigPath(); + if (StringUtils.isNotBlank(config.getConfigurationStoreConfigPath())) { + configFilePath = config.getConfigurationStoreConfigPath(); + } return MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(), MetadataStoreConfig.builder() .sessionTimeoutMillis((int) config.getMetadataStoreSessionTimeoutMillis()) .allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations()) - .configFilePath(config.getMetadataStoreConfigPath()) + .configFilePath(configFilePath) .batchingEnabled(config.isMetadataStoreBatchingEnabled()) .batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis()) .batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations()) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index cc29b615c11..e8bfb39395a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -61,8 +61,10 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; /** * Provide a zookeeper client to handle session expire. @@ -92,6 +94,9 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo private final RetryPolicy connectRetryPolicy; private final RetryPolicy operationRetryPolicy; + // Zookeeper config path + private final String configPath; + // Stats Logger private final OpStatsLogger createStats; private final OpStatsLogger getStats; @@ -120,8 +125,9 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo ZooKeeper newZk; try { newZk = createZooKeeper(); - } catch (IOException ie) { - log.error("Failed to create zookeeper instance to " + connectString, ie); + } catch (IOException | QuorumPeerConfig.ConfigException e) { + log.error("Failed to create zookeeper instance to {} with config path {}", + connectString, configPath, e); throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } waitForConnection(); @@ -149,7 +155,7 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo static PulsarZooKeeperClient createConnectedZooKeeperClient( String connectString, int sessionTimeoutMs, Set<Watcher> childWatchers, RetryPolicy operationRetryPolicy) - throws KeeperException, InterruptedException, IOException { + throws KeeperException, InterruptedException, IOException, QuorumPeerConfig.ConfigException { return PulsarZooKeeperClient.newBuilder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) @@ -171,6 +177,7 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT; double requestRateLimit = 0; boolean allowReadOnlyMode = false; + String configPath = null; private Builder() {} @@ -219,7 +226,15 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo return this; } - public PulsarZooKeeperClient build() throws IOException, KeeperException, InterruptedException { + public Builder configPath(String configPath) { + this.configPath = configPath; + return this; + } + + public PulsarZooKeeperClient build() throws IOException, + KeeperException, + InterruptedException, + QuorumPeerConfig.ConfigException { requireNonNull(connectString); checkArgument(sessionTimeoutMs > 0); requireNonNull(statsLogger); @@ -251,7 +266,8 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo statsLogger, retryExecThreadCount, requestRateLimit, - allowReadOnlyMode + allowReadOnlyMode, + configPath ); // Wait for connection to be established. try { @@ -273,16 +289,19 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo } protected PulsarZooKeeperClient(String connectString, - int sessionTimeoutMs, - ZooKeeperWatcherBase watcherManager, - RetryPolicy connectRetryPolicy, - RetryPolicy operationRetryPolicy, - StatsLogger statsLogger, - int retryExecThreadCount, - double rate, - boolean allowReadOnlyMode) throws IOException { - super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); + int sessionTimeoutMs, + ZooKeeperWatcherBase watcherManager, + RetryPolicy connectRetryPolicy, + RetryPolicy operationRetryPolicy, + StatsLogger statsLogger, + int retryExecThreadCount, + double rate, + boolean allowReadOnlyMode, + String configPath) throws IOException, QuorumPeerConfig.ConfigException { + super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode, + configPath == null ? null : new ZKClientConfig(configPath)); this.connectString = connectString; + this.configPath = configPath; this.sessionTimeoutMs = sessionTimeoutMs; this.allowReadOnlyMode = allowReadOnlyMode; this.watcherManager = watcherManager; @@ -334,7 +353,11 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo watcherManager.waitForConnection(); } - protected ZooKeeper createZooKeeper() throws IOException { + protected ZooKeeper createZooKeeper() throws IOException, QuorumPeerConfig.ConfigException { + if (null != configPath) { + return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode, + new ZKClientConfig(configPath)); + } return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 2e88cb33324..603a4503dc8 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -100,6 +100,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore .allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations()) .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()) .watchers(Collections.singleton(this::processSessionWatcher)) + .configPath(metadataStoreConfig.getConfigFilePath()) .build(); if (enableSessionWatcher) { sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); @@ -577,6 +578,7 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore .connectRetryPolicy( new BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(), metadataStoreConfig.getSessionTimeoutMillis(), 0)) + .configPath(metadataStoreConfig.getConfigFilePath()) .build()) { if (chrootZk.exists(chrootPath, false) == null) { createFullPathOptimistic(chrootZk, chrootPath, new byte[0], CreateMode.PERSISTENT); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java index f85e3d2dc75..38bf7265feb 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -40,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -71,15 +72,17 @@ public class OxiaMetadataStore extends AbstractMetadataStore { } updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer()); identity = UUID.randomUUID().toString(); - client = - OxiaClientBuilder.create(serviceAddress) - .clientIdentifier(identity) - .namespace(namespace) - .sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis())) - .batchLinger(Duration.ofMillis(linger)) - .maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations()) - .asyncClient() - .get(); + OxiaClientBuilder oxiaClientBuilder = OxiaClientBuilder + .create(serviceAddress) + .clientIdentifier(identity) + .namespace(namespace) + .sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis())) + .batchLinger(Duration.ofMillis(linger)) + .maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations()); + if (StringUtils.isNotBlank(metadataStoreConfig.getConfigFilePath())) { + oxiaClientBuilder.loadConfig(metadataStoreConfig.getConfigFilePath()); + } + client = oxiaClientBuilder.asyncClient().get(); client.notifications(this::notificationCallback); super.registerSyncListener(Optional.ofNullable(metadataStoreConfig.getSynchronizer())); } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java index 491e3d0b964..c77de92ae3c 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java @@ -100,7 +100,7 @@ public abstract class BaseMetadataStoreTest extends TestRetrySupport { }; } - private synchronized String getOxiaServerConnectString() { + protected synchronized String getOxiaServerConnectString() { if (oxiaServer == null) { oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME); oxiaServer.start(); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index b1578188c68..2c589dfd482 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -24,9 +24,11 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -36,7 +38,13 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; + +import io.streamnative.oxia.client.ClientConfig; +import io.streamnative.oxia.client.api.AsyncOxiaClient; +import io.streamnative.oxia.client.session.SessionFactory; +import io.streamnative.oxia.client.session.SessionManager; import lombok.Cleanup; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -51,9 +59,15 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient; import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; import org.assertj.core.util.Lists; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -457,7 +471,8 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { MetadataStoreConfig config = builder.build(); @Cleanup ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); - + ZooKeeper zkClient = store.getZkClient(); + assertTrue(zkClient.getClientConfig().isSaslClientEnabled()); final Runnable verify = () -> { String currentThreadName = Thread.currentThread().getName(); String errorMessage = String.format("Expect to switch to thread %s, but currently it is thread %s", @@ -500,6 +515,49 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { }).join(); } + @Test + public void testZkLoadConfigFromFile() throws Exception { + final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", ""); + MetadataStoreConfig.MetadataStoreConfigBuilder builder = + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName); + builder.fsyncEnable(false); + builder.batchingEnabled(true); + builder.configFilePath("src/test/resources/zk_client_disabled_sasl.conf"); + MetadataStoreConfig config = builder.build(); + @Cleanup + ZKMetadataStore store = (ZKMetadataStore) MetadataStoreFactory.create(zks.getConnectionString(), config); + + PulsarZooKeeperClient zkClient = (PulsarZooKeeperClient) store.getZkClient(); + assertFalse(zkClient.getClientConfig().isSaslClientEnabled()); + + zkClient.process(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); + + var zooKeeperRef = (AtomicReference<ZooKeeper>) WhiteboxImpl.getInternalState(zkClient, "zk"); + var zooKeeper = Awaitility.await().until(zooKeeperRef::get, Objects::nonNull); + assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled()); + } + + @Test + public void testOxiaLoadConfigFromFile() throws Exception { + final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", ""); + String oxia = "oxia://" + getOxiaServerConnectString(); + MetadataStoreConfig.MetadataStoreConfigBuilder builder = + MetadataStoreConfig.builder().metadataStoreName(metadataStoreName); + builder.fsyncEnable(false); + builder.batchingEnabled(true); + builder.sessionTimeoutMillis(30000); + builder.configFilePath("src/test/resources/oxia_client.conf"); + MetadataStoreConfig config = builder.build(); + + OxiaMetadataStore store = (OxiaMetadataStore) MetadataStoreFactory.create(oxia, config); + var client = (AsyncOxiaClient) WhiteboxImpl.getInternalState(store, "client"); + var sessionManager = (SessionManager) WhiteboxImpl.getInternalState(client, "sessionManager"); + var sessionFactory = (SessionFactory) WhiteboxImpl.getInternalState(sessionManager, "factory"); + var clientConfig = (ClientConfig) WhiteboxImpl.getInternalState(sessionFactory, "config"); + var sessionTimeout = clientConfig.sessionTimeout(); + assertEquals(sessionTimeout, Duration.ofSeconds(60)); + } + @Test(dataProvider = "impl") public void testPersistent(String provider, Supplier<String> urlSupplier) throws Exception { String metadataUrl = urlSupplier.get(); diff --git a/pulsar-metadata/src/test/resources/oxia_client.conf b/pulsar-metadata/src/test/resources/oxia_client.conf new file mode 100644 index 00000000000..3e92f05a340 --- /dev/null +++ b/pulsar-metadata/src/test/resources/oxia_client.conf @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sessionTimeout=60000 diff --git a/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf b/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf new file mode 100644 index 00000000000..9e0f6e8fd0f --- /dev/null +++ b/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +zookeeper.sasl.client=false
