asafm commented on code in PR #19208: URL: https://github.com/apache/pulsar/pull/19208#discussion_r1070531001
########## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java: ########## @@ -18,13 +18,30 @@ */ package org.apache.pulsar.metadata.impl; +import static org.apache.pulsar.metadata.impl.EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER; +import static org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER; +import static org.apache.pulsar.metadata.impl.RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER; +import static org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER; +import java.util.HashMap; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreProvider; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +@Slf4j public class MetadataStoreFactoryImpl { + public static final String METADATASTORE_PROVIDERS_PROPERTY = "pulsar.metadatastore.providers"; + + private static Map<String, MetadataStoreProvider> providers; + + static { + loadProviders(); Review Comment: This is not good for testing and, in conjunction with the comment below, IMO, not good for production code. In tests, you fill this static map which will never be emptied. Carrying over leftovers from one test to the next eventually creates flaky tests and unexpected behavior, especially over time with many people working on the same code base IMO. I suggest making this class stateful and instantiating it once when Pulsar boots. This solves the issue of the tests and creates a more streamlined code. ########## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java: ########## @@ -45,19 +62,50 @@ public static MetadataStoreExtended createExtended(String metadataURL, MetadataS private static MetadataStore newInstance(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) throws MetadataStoreException { + MetadataStoreProvider provider = findProvider(metadataURL); + if (provider != null) { + return provider.create(metadataURL, metadataStoreConfig, enableSessionWatcher); + } + return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher); + } + + static void loadProviders() { + String factoryClasses = System.getProperty(METADATASTORE_PROVIDERS_PROPERTY); + providers = new HashMap<>(); + providers.put(MEMORY_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(MEMORY_SCHEME_IDENTIFIER, + (url, config, enableWatchers) -> new LocalMemoryMetadataStore(url, config))); + providers.put(ROCKSDB_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(ROCKSDB_SCHEME_IDENTIFIER, + (url, config, enableWatchers) -> RocksdbMetadataStore.get(url, config))); + providers.put(ETCD_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(ETCD_SCHEME_IDENTIFIER, + EtcdMetadataStore::new)); + providers.put(ZK_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(ZK_SCHEME_IDENTIFIER, + (url, config, enableWatchers) -> + new ZKMetadataStore(url.substring(ZK_SCHEME_IDENTIFIER.length()), config, enableWatchers))); + + if (factoryClasses == null) { + return; + } + String[] classNames = factoryClasses.split(","); + for (String className : classNames) { + try { + Class<? extends MetadataStoreProvider> clazz = + (Class<? extends MetadataStoreProvider>) Class.forName(className); + MetadataStoreProvider provider = clazz.getConstructor().newInstance(); + String scheme = provider.urlScheme(); + providers.put(scheme, provider); + } catch (Exception e) { + log.warn("Failed to load metadata store provider class for name '{}'", className, e); + } + } + } - if (metadataURL.startsWith(LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER)) { - return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig); - } else if (metadataURL.startsWith(RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER)) { - return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig); - } else if (metadataURL.startsWith(EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER)) { - return new EtcdMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher); - } else if (metadataURL.startsWith(ZKMetadataStore.ZK_SCHEME_IDENTIFIER)) { - return new ZKMetadataStore(metadataURL.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length()), - metadataStoreConfig, enableSessionWatcher); - } else { - return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher); + private static MetadataStoreProvider findProvider(String metadataURL) { + for (Map.Entry<String, MetadataStoreProvider> entry : providers.entrySet()) { + if (metadataURL.startsWith(entry.getKey())) { Review Comment: Something doesn't add up here: When you register the known providers, you place them in the map with the colons: ``` providers.put(ETCD_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(ETCD_SCHEME_IDENTIFIER, ``` ``` static final String ETCD_SCHEME_IDENTIFIER = "etcd:"; ``` In the interface someone needs to implemented it has: ``` public interface MetadataStoreProvider { /** * Scheme of the urls that MetadataSores provided by this object can handle. */ String urlScheme(); ``` Normally a scheme is just a name without colons, so I believe this will be confusing. I suggest: 1. Use the scheme names in the map. 2. Interface method `urlScheme()` will return just scheme names without colons 3. Search from the beginning of the string to the first occurrence of ":"; get that string without colons, which is your scheme. 4. If the scheme doesn't exist, assume the scheme is "zk" and retrieve based on it. ########## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java: ########## @@ -45,19 +62,50 @@ public static MetadataStoreExtended createExtended(String metadataURL, MetadataS private static MetadataStore newInstance(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) throws MetadataStoreException { + MetadataStoreProvider provider = findProvider(metadataURL); + if (provider != null) { + return provider.create(metadataURL, metadataStoreConfig, enableSessionWatcher); + } + return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher); + } + + static void loadProviders() { + String factoryClasses = System.getProperty(METADATASTORE_PROVIDERS_PROPERTY); Review Comment: Why are you using a System Property and not the standard pulsar configuration, which can be loaded from a file and documented properly? ########## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java: ########## @@ -45,19 +62,50 @@ public static MetadataStoreExtended createExtended(String metadataURL, MetadataS private static MetadataStore newInstance(String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) throws MetadataStoreException { + MetadataStoreProvider provider = findProvider(metadataURL); + if (provider != null) { + return provider.create(metadataURL, metadataStoreConfig, enableSessionWatcher); + } + return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher); + } + + static void loadProviders() { + String factoryClasses = System.getProperty(METADATASTORE_PROVIDERS_PROPERTY); + providers = new HashMap<>(); + providers.put(MEMORY_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(MEMORY_SCHEME_IDENTIFIER, + (url, config, enableWatchers) -> new LocalMemoryMetadataStore(url, config))); + providers.put(ROCKSDB_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(ROCKSDB_SCHEME_IDENTIFIER, + (url, config, enableWatchers) -> RocksdbMetadataStore.get(url, config))); + providers.put(ETCD_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(ETCD_SCHEME_IDENTIFIER, + EtcdMetadataStore::new)); + providers.put(ZK_SCHEME_IDENTIFIER, new DefaultMetadataStoreProvider(ZK_SCHEME_IDENTIFIER, + (url, config, enableWatchers) -> + new ZKMetadataStore(url.substring(ZK_SCHEME_IDENTIFIER.length()), config, enableWatchers))); + + if (factoryClasses == null) { + return; + } + String[] classNames = factoryClasses.split(","); Review Comment: Let's be more user-friendly - Trim spaces before or after the separator. Guava has great utility for that. Let's ignore empty values if placed by mistake., ########## pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DefaultMetadataStoreProvider.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl; + +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreProvider; + +public class DefaultMetadataStoreProvider implements MetadataStoreProvider { + + public interface MetadataStoreConstructor { Review Comment: I think this creates an extra hop of confusion - it took me a while to figure out what this is doing here. When you see `MetadataStoreProvider`, you almost automatically expect to see the different implementations per store. The fact that we indirectly place them in another interface named `MetadataStoreConstructor` is confusing. I suggest having an implementation per store of `MetadataStoreProvider`. It's very short anyway, two methods, each having one line. IMO it will lower the confusion and be aligned with the external implementations, which will have one per store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org