This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 76f16e811be [improve][pip] PIP-366: Support to specify different
config for Configuration and Local Metadata Store (#23041)
76f16e811be is described below
commit 76f16e811beb4f48fb2ae5c46558b74d333c7d60
Author: Kai Wang <[email protected]>
AuthorDate: Mon Aug 5 10:07:10 2024 +0800
[improve][pip] PIP-366: Support to specify different config for
Configuration and Local Metadata Store (#23041)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 8 ++-
.../org/apache/pulsar/broker/PulsarService.java | 6 ++-
.../metadata/impl/PulsarZooKeeperClient.java | 53 +++++++++++++------
.../pulsar/metadata/impl/ZKMetadataStore.java | 2 +
.../metadata/impl/oxia/OxiaMetadataStore.java | 21 ++++----
.../pulsar/metadata/BaseMetadataStoreTest.java | 2 +-
.../apache/pulsar/metadata/MetadataStoreTest.java | 60 +++++++++++++++++++++-
.../src/test/resources/oxia_client.conf | 20 ++++++++
.../test/resources/zk_client_disabled_sasl.conf | 20 ++++++++
9 files changed, 164 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2d2765287c0..26b2f99abf5 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -562,10 +562,16 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_SERVER,
- doc = "Configuration file path for local metadata store. It's
supported by RocksdbMetadataStore for now."
+ doc = "Configuration file path for local metadata store."
)
private String metadataStoreConfigPath = null;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Configuration file path for configuration metadata store."
+ )
+ private String configurationStoreConfigPath = null;
+
@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 060e905d1f3..3d57a3bc010 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -391,11 +391,15 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
OpenTelemetry
openTelemetry)
throws MetadataStoreException {
+ String configFilePath = config.getMetadataStoreConfigPath();
+ if (StringUtils.isNotBlank(config.getConfigurationStoreConfigPath())) {
+ configFilePath = config.getConfigurationStoreConfigPath();
+ }
return
MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int)
config.getMetadataStoreSessionTimeoutMillis())
.allowReadOnlyOperations(config.isMetadataStoreAllowReadOnlyOperations())
- .configFilePath(config.getMetadataStoreConfigPath())
+ .configFilePath(configFilePath)
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index cc29b615c11..e8bfb39395a 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -61,8 +61,10 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
/**
* Provide a zookeeper client to handle session expire.
@@ -92,6 +94,9 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
private final RetryPolicy connectRetryPolicy;
private final RetryPolicy operationRetryPolicy;
+ // Zookeeper config path
+ private final String configPath;
+
// Stats Logger
private final OpStatsLogger createStats;
private final OpStatsLogger getStats;
@@ -120,8 +125,9 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
ZooKeeper newZk;
try {
newZk = createZooKeeper();
- } catch (IOException ie) {
- log.error("Failed to create zookeeper instance to
" + connectString, ie);
+ } catch (IOException |
QuorumPeerConfig.ConfigException e) {
+ log.error("Failed to create zookeeper instance to
{} with config path {}",
+ connectString, configPath, e);
throw
KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
waitForConnection();
@@ -149,7 +155,7 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
static PulsarZooKeeperClient createConnectedZooKeeperClient(
String connectString, int sessionTimeoutMs, Set<Watcher>
childWatchers,
RetryPolicy operationRetryPolicy)
- throws KeeperException, InterruptedException, IOException {
+ throws KeeperException, InterruptedException, IOException,
QuorumPeerConfig.ConfigException {
return PulsarZooKeeperClient.newBuilder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
@@ -171,6 +177,7 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
double requestRateLimit = 0;
boolean allowReadOnlyMode = false;
+ String configPath = null;
private Builder() {}
@@ -219,7 +226,15 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
return this;
}
- public PulsarZooKeeperClient build() throws IOException,
KeeperException, InterruptedException {
+ public Builder configPath(String configPath) {
+ this.configPath = configPath;
+ return this;
+ }
+
+ public PulsarZooKeeperClient build() throws IOException,
+ KeeperException,
+ InterruptedException,
+ QuorumPeerConfig.ConfigException {
requireNonNull(connectString);
checkArgument(sessionTimeoutMs > 0);
requireNonNull(statsLogger);
@@ -251,7 +266,8 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
statsLogger,
retryExecThreadCount,
requestRateLimit,
- allowReadOnlyMode
+ allowReadOnlyMode,
+ configPath
);
// Wait for connection to be established.
try {
@@ -273,16 +289,19 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
}
protected PulsarZooKeeperClient(String connectString,
- int sessionTimeoutMs,
- ZooKeeperWatcherBase watcherManager,
- RetryPolicy connectRetryPolicy,
- RetryPolicy operationRetryPolicy,
- StatsLogger statsLogger,
- int retryExecThreadCount,
- double rate,
- boolean allowReadOnlyMode) throws IOException {
- super(connectString, sessionTimeoutMs, watcherManager,
allowReadOnlyMode);
+ int sessionTimeoutMs,
+ ZooKeeperWatcherBase watcherManager,
+ RetryPolicy connectRetryPolicy,
+ RetryPolicy operationRetryPolicy,
+ StatsLogger statsLogger,
+ int retryExecThreadCount,
+ double rate,
+ boolean allowReadOnlyMode,
+ String configPath) throws IOException,
QuorumPeerConfig.ConfigException {
+ super(connectString, sessionTimeoutMs, watcherManager,
allowReadOnlyMode,
+ configPath == null ? null : new ZKClientConfig(configPath));
this.connectString = connectString;
+ this.configPath = configPath;
this.sessionTimeoutMs = sessionTimeoutMs;
this.allowReadOnlyMode = allowReadOnlyMode;
this.watcherManager = watcherManager;
@@ -334,7 +353,11 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
watcherManager.waitForConnection();
}
- protected ZooKeeper createZooKeeper() throws IOException {
+ protected ZooKeeper createZooKeeper() throws IOException,
QuorumPeerConfig.ConfigException {
+ if (null != configPath) {
+ return new ZooKeeper(connectString, sessionTimeoutMs,
watcherManager, allowReadOnlyMode,
+ new ZKClientConfig(configPath));
+ }
return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager,
allowReadOnlyMode);
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 2e88cb33324..603a4503dc8 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -100,6 +100,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
.allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations())
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
.watchers(Collections.singleton(this::processSessionWatcher))
+ .configPath(metadataStoreConfig.getConfigFilePath())
.build();
if (enableSessionWatcher) {
sessionWatcher = new ZKSessionWatcher(zkc,
this::receivedSessionEvent);
@@ -577,6 +578,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
.connectRetryPolicy(
new
BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
metadataStoreConfig.getSessionTimeoutMillis(), 0))
+ .configPath(metadataStoreConfig.getConfigFilePath())
.build()) {
if (chrootZk.exists(chrootPath, false) == null) {
createFullPathOptimistic(chrootZk, chrootPath, new
byte[0], CreateMode.PERSISTENT);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index e9da7ec7c1a..9141ad3d29c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -80,15 +81,17 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
}
synchronizer =
Optional.ofNullable(metadataStoreConfig.getSynchronizer());
identity = UUID.randomUUID().toString();
- client =
- OxiaClientBuilder.create(serviceAddress)
- .clientIdentifier(identity)
- .namespace(namespace)
-
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
- .batchLinger(Duration.ofMillis(linger))
-
.maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations())
- .asyncClient()
- .get();
+ OxiaClientBuilder oxiaClientBuilder = OxiaClientBuilder
+ .create(serviceAddress)
+ .clientIdentifier(identity)
+ .namespace(namespace)
+
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
+ .batchLinger(Duration.ofMillis(linger))
+
.maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations());
+ if (StringUtils.isNotBlank(metadataStoreConfig.getConfigFilePath())) {
+
oxiaClientBuilder.loadConfig(metadataStoreConfig.getConfigFilePath());
+ }
+ client = oxiaClientBuilder.asyncClient().get();
init();
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 491e3d0b964..c77de92ae3c 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -100,7 +100,7 @@ public abstract class BaseMetadataStoreTest extends
TestRetrySupport {
};
}
- private synchronized String getOxiaServerConnectString() {
+ protected synchronized String getOxiaServerConnectString() {
if (oxiaServer == null) {
oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME);
oxiaServer.start();
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index b1578188c68..2c589dfd482 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -24,9 +24,11 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -36,7 +38,13 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+
+import io.streamnative.oxia.client.ClientConfig;
+import io.streamnative.oxia.client.api.AsyncOxiaClient;
+import io.streamnative.oxia.client.session.SessionFactory;
+import io.streamnative.oxia.client.session.SessionManager;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -51,9 +59,15 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -457,7 +471,8 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
MetadataStoreConfig config = builder.build();
@Cleanup
ZKMetadataStore store = (ZKMetadataStore)
MetadataStoreFactory.create(zks.getConnectionString(), config);
-
+ ZooKeeper zkClient = store.getZkClient();
+ assertTrue(zkClient.getClientConfig().isSaslClientEnabled());
final Runnable verify = () -> {
String currentThreadName = Thread.currentThread().getName();
String errorMessage = String.format("Expect to switch to thread
%s, but currently it is thread %s",
@@ -500,6 +515,49 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
}).join();
}
+ @Test
+ public void testZkLoadConfigFromFile() throws Exception {
+ final String metadataStoreName =
UUID.randomUUID().toString().replaceAll("-", "");
+ MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+ builder.fsyncEnable(false);
+ builder.batchingEnabled(true);
+
builder.configFilePath("src/test/resources/zk_client_disabled_sasl.conf");
+ MetadataStoreConfig config = builder.build();
+ @Cleanup
+ ZKMetadataStore store = (ZKMetadataStore)
MetadataStoreFactory.create(zks.getConnectionString(), config);
+
+ PulsarZooKeeperClient zkClient = (PulsarZooKeeperClient)
store.getZkClient();
+ assertFalse(zkClient.getClientConfig().isSaslClientEnabled());
+
+ zkClient.process(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
+
+ var zooKeeperRef = (AtomicReference<ZooKeeper>)
WhiteboxImpl.getInternalState(zkClient, "zk");
+ var zooKeeper = Awaitility.await().until(zooKeeperRef::get,
Objects::nonNull);
+ assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled());
+ }
+
+ @Test
+ public void testOxiaLoadConfigFromFile() throws Exception {
+ final String metadataStoreName =
UUID.randomUUID().toString().replaceAll("-", "");
+ String oxia = "oxia://" + getOxiaServerConnectString();
+ MetadataStoreConfig.MetadataStoreConfigBuilder builder =
+
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName);
+ builder.fsyncEnable(false);
+ builder.batchingEnabled(true);
+ builder.sessionTimeoutMillis(30000);
+ builder.configFilePath("src/test/resources/oxia_client.conf");
+ MetadataStoreConfig config = builder.build();
+
+ OxiaMetadataStore store = (OxiaMetadataStore)
MetadataStoreFactory.create(oxia, config);
+ var client = (AsyncOxiaClient) WhiteboxImpl.getInternalState(store,
"client");
+ var sessionManager = (SessionManager)
WhiteboxImpl.getInternalState(client, "sessionManager");
+ var sessionFactory = (SessionFactory)
WhiteboxImpl.getInternalState(sessionManager, "factory");
+ var clientConfig = (ClientConfig)
WhiteboxImpl.getInternalState(sessionFactory, "config");
+ var sessionTimeout = clientConfig.sessionTimeout();
+ assertEquals(sessionTimeout, Duration.ofSeconds(60));
+ }
+
@Test(dataProvider = "impl")
public void testPersistent(String provider, Supplier<String> urlSupplier)
throws Exception {
String metadataUrl = urlSupplier.get();
diff --git a/pulsar-metadata/src/test/resources/oxia_client.conf
b/pulsar-metadata/src/test/resources/oxia_client.conf
new file mode 100644
index 00000000000..3e92f05a340
--- /dev/null
+++ b/pulsar-metadata/src/test/resources/oxia_client.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+sessionTimeout=60000
diff --git a/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf
b/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf
new file mode 100644
index 00000000000..9e0f6e8fd0f
--- /dev/null
+++ b/pulsar-metadata/src/test/resources/zk_client_disabled_sasl.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+zookeeper.sasl.client=false