asafm commented on code in PR #19208:
URL: https://github.com/apache/pulsar/pull/19208#discussion_r1070531001


##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java:
##########
@@ -18,13 +18,30 @@
  */
 package org.apache.pulsar.metadata.impl;
 
+import static 
org.apache.pulsar.metadata.impl.EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER;
+import static 
org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER;
+import static 
org.apache.pulsar.metadata.impl.RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER;
+import static 
org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
+@Slf4j
 public class MetadataStoreFactoryImpl {
 
+    public static final String METADATASTORE_PROVIDERS_PROPERTY = 
"pulsar.metadatastore.providers";
+
+    private static Map<String, MetadataStoreProvider> providers;
+
+    static {
+        loadProviders();

Review Comment:
   This is not good for testing and, in conjunction with the comment below, 
IMO, not good for production code.
   In tests, you fill this static map which will never be emptied. Carrying 
over leftovers from one test to the next eventually creates flaky tests and 
unexpected behavior, especially over time with many people working on the same 
code base IMO.
   
   I suggest making this class stateful and instantiating it once when Pulsar 
boots. This solves the issue of the tests and creates a more streamlined code.



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java:
##########
@@ -45,19 +62,50 @@ public static MetadataStoreExtended createExtended(String 
metadataURL, MetadataS
     private static MetadataStore newInstance(String metadataURL, 
MetadataStoreConfig metadataStoreConfig,
                                              boolean enableSessionWatcher)
             throws MetadataStoreException {
+        MetadataStoreProvider provider = findProvider(metadataURL);
+        if (provider != null) {
+            return provider.create(metadataURL, metadataStoreConfig, 
enableSessionWatcher);
+        }
+        return new ZKMetadataStore(metadataURL, metadataStoreConfig, 
enableSessionWatcher);
+    }
+
+    static void loadProviders() {
+        String factoryClasses = 
System.getProperty(METADATASTORE_PROVIDERS_PROPERTY);
+        providers = new HashMap<>();
+        providers.put(MEMORY_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(MEMORY_SCHEME_IDENTIFIER,
+                (url, config, enableWatchers) -> new 
LocalMemoryMetadataStore(url, config)));
+        providers.put(ROCKSDB_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(ROCKSDB_SCHEME_IDENTIFIER,
+                (url, config, enableWatchers) -> RocksdbMetadataStore.get(url, 
config)));
+        providers.put(ETCD_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(ETCD_SCHEME_IDENTIFIER,
+                EtcdMetadataStore::new));
+        providers.put(ZK_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(ZK_SCHEME_IDENTIFIER,
+                (url, config, enableWatchers) ->
+                        new 
ZKMetadataStore(url.substring(ZK_SCHEME_IDENTIFIER.length()), config, 
enableWatchers)));
+
+        if (factoryClasses == null) {
+            return;
+        }
+        String[] classNames = factoryClasses.split(",");
+        for (String className : classNames) {
+            try {
+                Class<? extends MetadataStoreProvider> clazz =
+                        (Class<? extends MetadataStoreProvider>) 
Class.forName(className);
+                MetadataStoreProvider provider = 
clazz.getConstructor().newInstance();
+                String scheme = provider.urlScheme();
+                providers.put(scheme, provider);
+            } catch (Exception e) {
+                log.warn("Failed to load metadata store provider class for 
name '{}'", className, e);
+            }
+        }
+    }
 
-        if 
(metadataURL.startsWith(LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER)) {
-            return new LocalMemoryMetadataStore(metadataURL, 
metadataStoreConfig);
-        } else if 
(metadataURL.startsWith(RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER)) {
-            return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig);
-        } else if 
(metadataURL.startsWith(EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER)) {
-            return new EtcdMetadataStore(metadataURL, metadataStoreConfig, 
enableSessionWatcher);
-        } else if 
(metadataURL.startsWith(ZKMetadataStore.ZK_SCHEME_IDENTIFIER)) {
-            return new 
ZKMetadataStore(metadataURL.substring(ZKMetadataStore.ZK_SCHEME_IDENTIFIER.length()),
-                    metadataStoreConfig, enableSessionWatcher);
-        } else {
-            return new ZKMetadataStore(metadataURL, metadataStoreConfig, 
enableSessionWatcher);
+    private static MetadataStoreProvider findProvider(String metadataURL) {
+        for (Map.Entry<String, MetadataStoreProvider> entry : 
providers.entrySet()) {
+            if (metadataURL.startsWith(entry.getKey())) {

Review Comment:
   Something doesn't add up here: When you register the known providers, you 
place them in the map with the colons:
   
   ```
   providers.put(ETCD_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(ETCD_SCHEME_IDENTIFIER,
   ```
   
   ```
   static final String ETCD_SCHEME_IDENTIFIER = "etcd:";
   ```
   
   In the interface someone needs to implemented it has:
   
   ```
   public interface MetadataStoreProvider {
   
   
       /**
        * Scheme of the urls that MetadataSores provided by this object can 
handle.
        */
       String urlScheme();
   ```
   
   Normally a scheme is just a name without colons, so I believe this will be 
confusing.
   
   
   I suggest:
   
   1. Use the scheme names in the map.
   2. Interface method `urlScheme()` will return just scheme names without 
colons
   3. Search from the beginning of the string to the first occurrence of ":"; 
get that string without colons, which is your scheme.
   4. If the scheme doesn't exist, assume the scheme is "zk" and retrieve based 
on it.
   



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java:
##########
@@ -45,19 +62,50 @@ public static MetadataStoreExtended createExtended(String 
metadataURL, MetadataS
     private static MetadataStore newInstance(String metadataURL, 
MetadataStoreConfig metadataStoreConfig,
                                              boolean enableSessionWatcher)
             throws MetadataStoreException {
+        MetadataStoreProvider provider = findProvider(metadataURL);
+        if (provider != null) {
+            return provider.create(metadataURL, metadataStoreConfig, 
enableSessionWatcher);
+        }
+        return new ZKMetadataStore(metadataURL, metadataStoreConfig, 
enableSessionWatcher);
+    }
+
+    static void loadProviders() {
+        String factoryClasses = 
System.getProperty(METADATASTORE_PROVIDERS_PROPERTY);

Review Comment:
   Why are you using a System Property and not the standard pulsar 
configuration, which can be loaded from a file and documented properly?



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java:
##########
@@ -45,19 +62,50 @@ public static MetadataStoreExtended createExtended(String 
metadataURL, MetadataS
     private static MetadataStore newInstance(String metadataURL, 
MetadataStoreConfig metadataStoreConfig,
                                              boolean enableSessionWatcher)
             throws MetadataStoreException {
+        MetadataStoreProvider provider = findProvider(metadataURL);
+        if (provider != null) {
+            return provider.create(metadataURL, metadataStoreConfig, 
enableSessionWatcher);
+        }
+        return new ZKMetadataStore(metadataURL, metadataStoreConfig, 
enableSessionWatcher);
+    }
+
+    static void loadProviders() {
+        String factoryClasses = 
System.getProperty(METADATASTORE_PROVIDERS_PROPERTY);
+        providers = new HashMap<>();
+        providers.put(MEMORY_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(MEMORY_SCHEME_IDENTIFIER,
+                (url, config, enableWatchers) -> new 
LocalMemoryMetadataStore(url, config)));
+        providers.put(ROCKSDB_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(ROCKSDB_SCHEME_IDENTIFIER,
+                (url, config, enableWatchers) -> RocksdbMetadataStore.get(url, 
config)));
+        providers.put(ETCD_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(ETCD_SCHEME_IDENTIFIER,
+                EtcdMetadataStore::new));
+        providers.put(ZK_SCHEME_IDENTIFIER, new 
DefaultMetadataStoreProvider(ZK_SCHEME_IDENTIFIER,
+                (url, config, enableWatchers) ->
+                        new 
ZKMetadataStore(url.substring(ZK_SCHEME_IDENTIFIER.length()), config, 
enableWatchers)));
+
+        if (factoryClasses == null) {
+            return;
+        }
+        String[] classNames = factoryClasses.split(",");

Review Comment:
   Let's be more user-friendly - Trim spaces before or after the separator. 
Guava has great utility for that.
   Let's ignore empty values if placed by mistake.,



##########
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DefaultMetadataStoreProvider.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
+
+public class DefaultMetadataStoreProvider implements MetadataStoreProvider {
+
+    public interface MetadataStoreConstructor {

Review Comment:
   I think this creates an extra hop of confusion - it took me a while to 
figure out what this is doing here.
   When you see `MetadataStoreProvider`, you almost automatically expect to see 
the different implementations per store.
   The fact that we indirectly place them in another interface named 
`MetadataStoreConstructor` is confusing.
   
   I suggest having an implementation per store of `MetadataStoreProvider`. 
It's very short anyway, two methods, each having one line. 
   
   IMO it will lower the confusion and be aligned with the external 
implementations, which will have one per store.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to