This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 86205a9edfa [improve][broker]Enable custom metadata stores (#19208)
86205a9edfa is described below
commit 86205a9edfafb8710b24bdb0d2f1db84c5fc1926
Author: Andras Beni <[email protected]>
AuthorDate: Thu Jan 19 20:19:48 2023 +0100
[improve][broker]Enable custom metadata stores (#19208)
---
.../pulsar/metadata/api/MetadataStoreProvider.java | 37 +++++++
.../pulsar/metadata/impl/EtcdMetadataStore.java | 17 +++
.../metadata/impl/LocalMemoryMetadataStore.java | 17 +++
.../metadata/impl/MetadataStoreFactoryImpl.java | 67 +++++++----
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 17 +++
.../pulsar/metadata/impl/ZKMetadataStore.java | 20 ++++
.../impl/MetadataStoreFactoryImplTest.java | 123 +++++++++++++++++++++
7 files changed, 279 insertions(+), 19 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreProvider.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreProvider.java
new file mode 100644
index 00000000000..9c923aecb6c
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+public interface MetadataStoreProvider {
+
+
+ /**
+ * Scheme of the urls that MetadataSores provided by this object can
handle.
+ */
+ String urlScheme();
+
+ /**
+ * Creates a new MetadataStore.
+ * @throws MetadataStoreException if any exception happens while creating
the metadata store.
+ */
+ MetadataStore create(
+ String metadataURL,
+ MetadataStoreConfig metadataStoreConfig,
+ boolean enableSessionWatcher) throws MetadataStoreException;
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index 5710477fe06..a7fb7192cb5 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -72,8 +72,10 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
@@ -89,6 +91,7 @@ import org.apache.pulsar.metadata.impl.batching.OpPut;
@Slf4j
public class EtcdMetadataStore extends AbstractBatchedMetadataStore {
+ static final String ETCD_SCHEME = "etcd";
static final String ETCD_SCHEME_IDENTIFIER = "etcd:";
private final int leaseTTLSeconds;
@@ -498,3 +501,17 @@ class EtcdConfig {
private String authority;
}
+
+class EtcdMetadataStoreProvider implements MetadataStoreProvider {
+
+ @Override
+ public String urlScheme() {
+ return EtcdMetadataStore.ETCD_SCHEME;
+ }
+
+ @Override
+ public MetadataStore create(String metadataURL, MetadataStoreConfig
metadataStoreConfig,
+ boolean enableSessionWatcher) throws
MetadataStoreException {
+ return new EtcdMetadataStore(metadataURL, metadataStoreConfig,
enableSessionWatcher);
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 616d0901c08..94d9a1f8937 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -37,10 +37,12 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
@@ -50,6 +52,7 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@Slf4j
public class LocalMemoryMetadataStore extends AbstractMetadataStore implements
MetadataStoreExtended {
+ static final String MEMORY_SCHEME = "memory";
static final String MEMORY_SCHEME_IDENTIFIER = "memory:";
@Data
@@ -237,3 +240,17 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
}
}
+
+class MemoryMetadataStoreProvider implements MetadataStoreProvider {
+
+ @Override
+ public String urlScheme() {
+ return LocalMemoryMetadataStore.MEMORY_SCHEME;
+ }
+
+ @Override
+ public MetadataStore create(String metadataURL, MetadataStoreConfig
metadataStoreConfig,
+ boolean enableSessionWatcher) throws
MetadataStoreException {
+ return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
index c64c849f529..dd4df69fc43 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
@@ -18,13 +18,25 @@
*/
package org.apache.pulsar.metadata.impl;
+import static
org.apache.pulsar.metadata.impl.EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER;
+import static
org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER;
+import static
org.apache.pulsar.metadata.impl.RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER;
+import static
org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER;
+import com.google.common.base.Splitter;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+@Slf4j
public class MetadataStoreFactoryImpl {
+ public static final String METADATASTORE_PROVIDERS_PROPERTY =
"pulsar.metadatastore.providers";
+
public static MetadataStore create(String metadataURL, MetadataStoreConfig
metadataStoreConfig) throws
MetadataStoreException {
return newInstance(metadataURL, metadataStoreConfig, false);
@@ -45,19 +57,41 @@ public class MetadataStoreFactoryImpl {
private static MetadataStore newInstance(String metadataURL,
MetadataStoreConfig metadataStoreConfig,
boolean enableSessionWatcher)
throws MetadataStoreException {
+ MetadataStoreProvider provider = findProvider(metadataURL);
+ return provider.create(metadataURL, metadataStoreConfig,
enableSessionWatcher);
+ }
+
+ static Map<String, MetadataStoreProvider> loadProviders() {
+ Map<String, MetadataStoreProvider> providers = new HashMap<>();
+ providers.put(MEMORY_SCHEME_IDENTIFIER, new
MemoryMetadataStoreProvider());
+ providers.put(ROCKSDB_SCHEME_IDENTIFIER, new
RocksdbMetadataStoreProvider());
+ providers.put(ETCD_SCHEME_IDENTIFIER, new EtcdMetadataStoreProvider());
+ providers.put(ZK_SCHEME_IDENTIFIER, new ZkMetadataStoreProvider());
+
+ String factoryClasses =
System.getProperty(METADATASTORE_PROVIDERS_PROPERTY, "");
+
+ for (String className :
Splitter.on(',').trimResults().omitEmptyStrings().split(factoryClasses)) {
+ try {
+ Class<? extends MetadataStoreProvider> clazz =
+ (Class<? extends MetadataStoreProvider>)
Class.forName(className);
+ MetadataStoreProvider provider =
clazz.getConstructor().newInstance();
+ String scheme = provider.urlScheme();
+ providers.put(scheme + ":", provider);
+ } catch (Exception e) {
+ log.warn("Failed to load metadata store provider class for
name '{}'", className, e);
+ }
+ }
+ return providers;
+ }
- if
(metadataURL.startsWith(LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER)) {
- return new LocalMemoryMetadataStore(metadataURL,
metadataStoreConfig);
- } else if
(metadataURL.startsWith(RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER)) {
- return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig);
- } else if
(metadataURL.startsWith(EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER)) {
- return new EtcdMetadataStore(metadataURL, metadataStoreConfig,
enableSessionWatcher);
- } else if
(metadataURL.startsWith(ZKMetadataStore.ZK_SCHEME_IDENTIFIER)) {
- return new
ZKMetadataStore(metadataURL.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length()),
- metadataStoreConfig, enableSessionWatcher);
- } else {
- return new ZKMetadataStore(metadataURL, metadataStoreConfig,
enableSessionWatcher);
+ private static MetadataStoreProvider findProvider(String metadataURL) {
+ Map<String, MetadataStoreProvider> providers = loadProviders();
+ for (Map.Entry<String, MetadataStoreProvider> entry :
providers.entrySet()) {
+ if (metadataURL.startsWith(entry.getKey())) {
+ return entry.getValue();
+ }
}
+ return providers.get(ZK_SCHEME_IDENTIFIER);
}
/**
@@ -70,14 +104,9 @@ public class MetadataStoreFactoryImpl {
* @return
*/
public static String removeIdentifierFromMetadataURL(String metadataURL) {
- if
(metadataURL.startsWith(LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER)) {
- return
metadataURL.substring(LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER.length());
- } else if
(metadataURL.startsWith(RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER)) {
- return
metadataURL.substring(RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER.length());
- } else if
(metadataURL.startsWith(EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER)) {
- return
metadataURL.substring(EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER.length());
- } else if
(metadataURL.startsWith(ZKMetadataStore.ZK_SCHEME_IDENTIFIER)) {
- return
metadataURL.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length());
+ MetadataStoreProvider provider = findProvider(metadataURL);
+ if (metadataURL.startsWith(provider.urlScheme() + ":")) {
+ return metadataURL.substring(provider.urlScheme().length() + 1);
}
return metadataURL;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 5d7e74fadf5..fcec2fcf9c1 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -47,8 +47,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
@@ -75,6 +77,7 @@ import org.rocksdb.WriteOptions;
@Slf4j
public class RocksdbMetadataStore extends AbstractMetadataStore {
+ static final String ROCKSDB_SCHEME = "rocksdb";
static final String ROCKSDB_SCHEME_IDENTIFIER = "rocksdb:";
private static final byte[] SEQUENTIAL_ID_KEY =
toBytes("__metadata_sequentialId_key");
@@ -570,3 +573,17 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
return Optional.ofNullable(synchronizer);
}
}
+
+class RocksdbMetadataStoreProvider implements MetadataStoreProvider {
+
+ @Override
+ public String urlScheme() {
+ return RocksdbMetadataStore.ROCKSDB_SCHEME;
+ }
+
+ @Override
+ public MetadataStore create(String metadataURL, MetadataStoreConfig
metadataStoreConfig,
+ boolean enableSessionWatcher) throws
MetadataStoreException {
+ return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig);
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index fdd54bd1013..a6d8eb8344c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -34,12 +34,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
@@ -69,6 +71,7 @@ import org.apache.zookeeper.client.ConnectStringParser;
public class ZKMetadataStore extends AbstractBatchedMetadataStore
implements MetadataStoreExtended, MetadataStoreLifecycle {
+ public static final String ZK_SCHEME = "zk";
public static final String ZK_SCHEME_IDENTIFIER = "zk:";
private final String zkConnectString;
@@ -614,3 +617,20 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
return rootPath;
}
}
+
+class ZkMetadataStoreProvider implements MetadataStoreProvider {
+
+ @Override
+ public String urlScheme() {
+ return ZKMetadataStore.ZK_SCHEME;
+ }
+
+ @Override
+ public MetadataStore create(String metadataURL, MetadataStoreConfig
metadataStoreConfig,
+ boolean enableSessionWatcher) throws
MetadataStoreException {
+ if (metadataURL.startsWith(ZKMetadataStore.ZK_SCHEME_IDENTIFIER)) {
+ metadataURL =
metadataURL.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length());
+ }
+ return new ZKMetadataStore(metadataURL, metadataStoreConfig,
enableSessionWatcher);
+ }
+}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
new file mode 100644
index 00000000000..ba12ddff062
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+public class MetadataStoreFactoryImplTest {
+
+ private static Object originalProperty;
+
+ @BeforeClass
+ public void setMetadataStoreProperty() {
+ originalProperty =
System.getProperties().get(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY);
+
System.setProperty(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY,
+ MyMetadataStoreProvider.class.getName());
+ }
+
+ @AfterClass
+ public void resetMetadataStoreProperty() {
+ if (originalProperty != null) {
+
System.getProperties().put(MetadataStoreFactoryImpl.METADATASTORE_PROVIDERS_PROPERTY,
originalProperty);
+ }
+ }
+
+
+ @Test
+ public void testCreate() throws MetadataStoreException{
+ MetadataStore instance = MetadataStoreFactoryImpl.create(
+ "custom://localhost",
+ MetadataStoreConfig.builder().build());
+ assertTrue(instance instanceof MyMetadataStore);
+ }
+
+
+ @Test
+ public void testRemoveIdentifierFromMetadataURL() {
+
assertEquals(MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL("zk:host:port"),
"host:port");
+
assertEquals(MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL("rocksdb:/data/dir"),
"/data/dir");
+
assertEquals(MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL("etcd:host:port"),
"host:port");
+
assertEquals(MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL("memory:name"),
"name");
+
assertEquals(MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL("http://unknown/url/scheme"),
"http://unknown/url/scheme");
+
assertEquals(MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL("custom:suffix"),
"suffix");
+ }
+
+ public static class MyMetadataStoreProvider implements
MetadataStoreProvider {
+
+ @Override
+ public String urlScheme() {
+ return "custom";
+ }
+
+ @Override
+ public MetadataStore create(String metadataURL, MetadataStoreConfig
metadataStoreConfig,
+ boolean enableSessionWatcher) throws
MetadataStoreException {
+ return new MyMetadataStore();
+ }
+ }
+
+ public static class MyMetadataStore extends AbstractMetadataStore {
+ protected MyMetadataStore() {
+ super("custom");
+ }
+
+ @Override
+ protected CompletableFuture<List<String>> getChildrenFromStore(String
path) {
+ return null;
+ }
+
+ @Override
+ protected CompletableFuture<Boolean> existsFromStore(String path) {
+ return null;
+ }
+
+ @Override
+ protected CompletableFuture<Optional<GetResult>> storeGet(String path)
{
+ return null;
+ }
+
+ @Override
+ protected CompletableFuture<Void> storeDelete(String path,
Optional<Long> expectedVersion) {
+ return null;
+ }
+
+ @Override
+ protected CompletableFuture<Stat> storePut(String path, byte[] data,
Optional<Long> optExpectedVersion,
+ EnumSet<CreateOption>
options) {
+ return null;
+ }
+ }
+
+
+}