This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f25843  Allow to have different instances LocalMemoryMetadataStore 
that share the same state (#12390)
9f25843 is described below

commit 9f258439da65b11f2a8425ced63b6b50f0c9fc09
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Oct 25 14:30:15 2021 -0700

    Allow to have different instances LocalMemoryMetadataStore that share the 
same state (#12390)
---
 .../metadata/impl/LocalMemoryMetadataStore.java    | 227 ++++++++++++---------
 .../pulsar/metadata/BaseMetadataStoreTest.java     |   3 +-
 .../metadata/LocalMemoryMetadataStoreTest.java     |  90 ++++++++
 3 files changed, 227 insertions(+), 93 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index bf4a225..a683bc9 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -18,17 +18,23 @@
  */
 package org.apache.pulsar.metadata.impl;
 
+import com.google.common.collect.MapMaker;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import lombok.Data;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -41,6 +47,7 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
+@Slf4j
 public class LocalMemoryMetadataStore extends AbstractMetadataStore implements 
MetadataStoreExtended {
 
     @Data
@@ -55,56 +62,85 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     private final NavigableMap<String, Value> map;
     private final AtomicLong sequentialIdGenerator;
 
-    public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig) {
-        map = new TreeMap<>();
-        sequentialIdGenerator = new AtomicLong();
-    }
-
-    @Override
-    public synchronized CompletableFuture<Optional<GetResult>> get(String 
path) {
-        if (!isValidPath(path)) {
-            return FutureUtils.exception(new MetadataStoreException(""));
+    private static final Map<String, NavigableMap<String, Value>> STATIC_MAPS 
= new MapMaker()
+            .weakValues().makeMap();
+    private static final Map<String, AtomicLong> STATIC_ID_GEN_MAP = new 
MapMaker()
+            .weakValues().makeMap();
+
+    public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
+            throws MetadataStoreException {
+        URI uri;
+        try {
+            uri = new URI(metadataURL);
+        } catch (URISyntaxException e) {
+            throw new MetadataStoreException(e);
         }
 
-        Value v = map.get(path);
-        if (v != null) {
-            return FutureUtils.value(
-                    Optional.of(new GetResult(v.data, new Stat(path, 
v.version, v.createdTimestamp, v.modifiedTimestamp,
-                            v.isEphemeral(), true))));
+        // Local means a private data set
+        if ("local".equals(uri.getHost())) {
+            map = new TreeMap<>();
+            sequentialIdGenerator = new AtomicLong();
         } else {
-            return FutureUtils.value(Optional.empty());
+            // Use a reference from a shared data set
+            String name = uri.getHost();
+            map = STATIC_MAPS.computeIfAbsent(name, __ -> new TreeMap<>());
+            sequentialIdGenerator = STATIC_ID_GEN_MAP.computeIfAbsent(name, __ 
-> new AtomicLong());
+            log.info("Created LocalMemoryDataStore for '{}'", name);
         }
     }
 
     @Override
-    public synchronized CompletableFuture<List<String>> 
getChildrenFromStore(String path) {
-        if (!isValidPath(path)) {
-            return FutureUtils.exception(new MetadataStoreException(""));
-        }
+    public CompletableFuture<Optional<GetResult>> get(String path) {
+        synchronized (map) {
+            if (!isValidPath(path)) {
+                return FutureUtils.exception(new MetadataStoreException(""));
+            }
 
-        String firstKey = path.equals("/") ? path : path + "/";
-        String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is 
lexicographically just after '/'
+            Value v = map.get(path);
+            if (v != null) {
+                return FutureUtils.value(
+                        Optional.of(
+                                new GetResult(v.data, new Stat(path, 
v.version, v.createdTimestamp, v.modifiedTimestamp,
+                                        v.isEphemeral(), true))));
+            } else {
+                return FutureUtils.value(Optional.empty());
+            }
+        }
+    }
 
-        List<String> children = new ArrayList<>();
-        map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
-            String relativePath = key.replace(firstKey, "");
-            if (!relativePath.contains("/")) {
-                // Only return first-level children
-                children.add(relativePath);
+    @Override
+    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+        synchronized (map) {
+            if (!isValidPath(path)) {
+                return FutureUtils.exception(new MetadataStoreException(""));
             }
-        });
 
-        return FutureUtils.value(children);
+            String firstKey = path.equals("/") ? path : path + "/";
+            String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is 
lexicographically just after '/'
+
+            List<String> children = new ArrayList<>();
+            map.subMap(firstKey, false, lastKey, false).forEach((key, value) 
-> {
+                String relativePath = key.replace(firstKey, "");
+                if (!relativePath.contains("/")) {
+                    // Only return first-level children
+                    children.add(relativePath);
+                }
+            });
+
+            return FutureUtils.value(children);
+        }
     }
 
     @Override
-    public synchronized CompletableFuture<Boolean> existsFromStore(String 
path) {
-        if (!isValidPath(path)) {
-            return FutureUtils.exception(new MetadataStoreException(""));
-        }
+    public CompletableFuture<Boolean> existsFromStore(String path) {
+        synchronized (map) {
+            if (!isValidPath(path)) {
+                return FutureUtils.exception(new MetadataStoreException(""));
+            }
 
-        Value v = map.get(path);
-        return FutureUtils.value(v != null ? true : false);
+            Value v = map.get(path);
+            return FutureUtils.value(v != null ? true : false);
+        }
     }
 
     @Override
@@ -113,79 +149,86 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     }
 
     @Override
-    public synchronized CompletableFuture<Stat> storePut(String path, byte[] 
data, Optional<Long> optExpectedVersion,
-            EnumSet<CreateOption> options) {
-        if (!isValidPath(path)) {
-            return FutureUtils.exception(new MetadataStoreException(""));
-        }
+    public CompletableFuture<Stat> storePut(String path, byte[] data, 
Optional<Long> optExpectedVersion,
+                                            EnumSet<CreateOption> options) {
+        synchronized (map) {
+            if (!isValidPath(path)) {
+                return FutureUtils.exception(new MetadataStoreException(""));
+            }
 
-        boolean hasVersion = optExpectedVersion.isPresent();
-        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
+            boolean hasVersion = optExpectedVersion.isPresent();
+            int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
 
-        if (options.contains(CreateOption.Sequential)) {
-            path += Long.toString(sequentialIdGenerator.getAndIncrement());
-        }
+            if (options.contains(CreateOption.Sequential)) {
+                path += Long.toString(sequentialIdGenerator.getAndIncrement());
+            }
 
-        long now = System.currentTimeMillis();
+            long now = System.currentTimeMillis();
 
-        if (hasVersion && expectedVersion == -1) {
-            Value newValue = new Value(0, data, now, now, 
options.contains(CreateOption.Ephemeral));
-            Value existingValue = map.putIfAbsent(path, newValue);
-            if (existingValue != null) {
-                return FutureUtils.exception(new BadVersionException(""));
-            } else {
-                receivedNotification(new 
Notification(NotificationType.Created, path));
-                String parent = parent(path);
-                if (parent != null) {
-                    receivedNotification(new 
Notification(NotificationType.ChildrenChanged, parent));
-                }
-                return FutureUtils.value(new Stat(path, 0, now, now, 
newValue.isEphemeral(), true));
-            }
-        } else {
-            Value existingValue = map.get(path);
-            long existingVersion = existingValue != null ? 
existingValue.version : -1;
-            if (hasVersion && expectedVersion != existingVersion) {
-                return FutureUtils.exception(new BadVersionException(""));
-            } else {
-                long newVersion = existingValue != null ? 
existingValue.version + 1 : 0;
-                long createdTimestamp = existingValue != null ? 
existingValue.createdTimestamp : now;
-                Value newValue = new Value(newVersion, data, createdTimestamp, 
now,
-                        options.contains(CreateOption.Ephemeral));
-                map.put(path, newValue);
-
-                NotificationType type = existingValue == null ? 
NotificationType.Created : NotificationType.Modified;
-                receivedNotification(new Notification(type, path));
-                if (type == NotificationType.Created) {
+            if (hasVersion && expectedVersion == -1) {
+                Value newValue = new Value(0, data, now, now, 
options.contains(CreateOption.Ephemeral));
+                Value existingValue = map.putIfAbsent(path, newValue);
+                if (existingValue != null) {
+                    return FutureUtils.exception(new BadVersionException(""));
+                } else {
+                    receivedNotification(new 
Notification(NotificationType.Created, path));
                     String parent = parent(path);
                     if (parent != null) {
                         receivedNotification(new 
Notification(NotificationType.ChildrenChanged, parent));
                     }
+                    return FutureUtils.value(new Stat(path, 0, now, now, 
newValue.isEphemeral(), true));
+                }
+            } else {
+                Value existingValue = map.get(path);
+                long existingVersion = existingValue != null ? 
existingValue.version : -1;
+                if (hasVersion && expectedVersion != existingVersion) {
+                    return FutureUtils.exception(new BadVersionException(""));
+                } else {
+                    long newVersion = existingValue != null ? 
existingValue.version + 1 : 0;
+                    long createdTimestamp = existingValue != null ? 
existingValue.createdTimestamp : now;
+                    Value newValue = new Value(newVersion, data, 
createdTimestamp, now,
+                            options.contains(CreateOption.Ephemeral));
+                    map.put(path, newValue);
+
+                    NotificationType type =
+                            existingValue == null ? NotificationType.Created : 
NotificationType.Modified;
+                    receivedNotification(new Notification(type, path));
+                    if (type == NotificationType.Created) {
+                        String parent = parent(path);
+                        if (parent != null) {
+                            receivedNotification(new 
Notification(NotificationType.ChildrenChanged, parent));
+                        }
+                    }
+                    return FutureUtils
+                            .value(new Stat(path, newValue.version, 
newValue.createdTimestamp,
+                                    newValue.modifiedTimestamp,
+                                    false, true));
                 }
-                return FutureUtils
-                        .value(new Stat(path, newValue.version, 
newValue.createdTimestamp, newValue.modifiedTimestamp, false, true));
             }
         }
     }
 
     @Override
-    public synchronized CompletableFuture<Void> storeDelete(String path, 
Optional<Long> optExpectedVersion) {
-        if (!isValidPath(path)) {
-            return FutureUtils.exception(new MetadataStoreException(""));
-        }
+    public CompletableFuture<Void> storeDelete(String path, Optional<Long> 
optExpectedVersion) {
+        synchronized (map) {
+            if (!isValidPath(path)) {
+                return FutureUtils.exception(new MetadataStoreException(""));
+            }
 
-        Value value = map.get(path);
-        if (value == null) {
-            return FutureUtils.exception(new NotFoundException(""));
-        } else if (optExpectedVersion.isPresent() && optExpectedVersion.get() 
!= value.version) {
-            return FutureUtils.exception(new BadVersionException(""));
-        } else {
-            map.remove(path);
-            receivedNotification(new Notification(NotificationType.Deleted, 
path));
-            String parent = parent(path);
-            if (parent != null) {
-                receivedNotification(new 
Notification(NotificationType.ChildrenChanged, parent));
+            Value value = map.get(path);
+            if (value == null) {
+                return FutureUtils.exception(new NotFoundException(""));
+            } else if (optExpectedVersion.isPresent() && 
optExpectedVersion.get() != value.version) {
+                return FutureUtils.exception(new BadVersionException(""));
+            } else {
+                map.remove(path);
+                receivedNotification(new 
Notification(NotificationType.Deleted, path));
+                String parent = parent(path);
+                if (parent != null) {
+                    receivedNotification(new 
Notification(NotificationType.ChildrenChanged, parent));
+                }
+                return FutureUtils.value(null);
             }
-            return FutureUtils.value(null);
         }
     }
 
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 1911725..5cb6e5b 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertTrue;
+import java.util.UUID;
 import java.util.concurrent.CompletionException;
 import java.util.function.Supplier;
 import org.apache.pulsar.tests.TestRetrySupport;
@@ -52,7 +53,7 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
         // Supplier<String> lambda is used for providing the value.
         return new Object[][] {
                 { "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) 
},
-                { "Memory", stringSupplier(() -> "memory://local") },
+                { "Memory", stringSupplier(() -> "memory://" + 
UUID.randomUUID()) },
         };
     }
 
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java
new file mode 100644
index 0000000..0016d99
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+import org.testng.annotations.Test;
+
+public class LocalMemoryMetadataStoreTest {
+
+    @Test
+    public void testPrivateInstance() throws Exception {
+        @Cleanup
+        MetadataStore store1 = MetadataStoreFactory.create("memory://local",
+                MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        MetadataStore store2 = MetadataStoreFactory.create("memory://local",
+                MetadataStoreConfig.builder().build());
+
+        store1.put("/test", "value".getBytes(StandardCharsets.UTF_8), 
Optional.empty()).join();
+
+        assertTrue(store1.exists("/test").join());
+        assertFalse(store2.exists("/test").join());
+    }
+
+    @Test
+    public void testSharedInstance() throws Exception {
+        String url = "memory://" + UUID.randomUUID();
+
+        @Cleanup
+        MetadataStore store1 = MetadataStoreFactory.create(url,
+                MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        MetadataStore store2 = MetadataStoreFactory.create(url,
+                MetadataStoreConfig.builder().build());
+
+        store1.put("/test", "value".getBytes(StandardCharsets.UTF_8), 
Optional.empty()).join();
+
+        assertTrue(store1.exists("/test").join());
+        assertTrue(store2.exists("/test").join());
+
+        store2.delete("/test", Optional.empty()).join();
+
+        assertFalse(store1.exists("/test").join());
+        assertFalse(store2.exists("/test").join());
+    }
+}

Reply via email to