This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f25843 Allow to have different instances LocalMemoryMetadataStore
that share the same state (#12390)
9f25843 is described below
commit 9f258439da65b11f2a8425ced63b6b50f0c9fc09
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Oct 25 14:30:15 2021 -0700
Allow to have different instances LocalMemoryMetadataStore that share the
same state (#12390)
---
.../metadata/impl/LocalMemoryMetadataStore.java | 227 ++++++++++++---------
.../pulsar/metadata/BaseMetadataStoreTest.java | 3 +-
.../metadata/LocalMemoryMetadataStoreTest.java | 90 ++++++++
3 files changed, 227 insertions(+), 93 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index bf4a225..a683bc9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -18,17 +18,23 @@
*/
package org.apache.pulsar.metadata.impl;
+import com.google.common.collect.MapMaker;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -41,6 +47,7 @@ import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+@Slf4j
public class LocalMemoryMetadataStore extends AbstractMetadataStore implements
MetadataStoreExtended {
@Data
@@ -55,56 +62,85 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
private final NavigableMap<String, Value> map;
private final AtomicLong sequentialIdGenerator;
- public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig) {
- map = new TreeMap<>();
- sequentialIdGenerator = new AtomicLong();
- }
-
- @Override
- public synchronized CompletableFuture<Optional<GetResult>> get(String
path) {
- if (!isValidPath(path)) {
- return FutureUtils.exception(new MetadataStoreException(""));
+ private static final Map<String, NavigableMap<String, Value>> STATIC_MAPS
= new MapMaker()
+ .weakValues().makeMap();
+ private static final Map<String, AtomicLong> STATIC_ID_GEN_MAP = new
MapMaker()
+ .weakValues().makeMap();
+
+ public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
+ throws MetadataStoreException {
+ URI uri;
+ try {
+ uri = new URI(metadataURL);
+ } catch (URISyntaxException e) {
+ throw new MetadataStoreException(e);
}
- Value v = map.get(path);
- if (v != null) {
- return FutureUtils.value(
- Optional.of(new GetResult(v.data, new Stat(path,
v.version, v.createdTimestamp, v.modifiedTimestamp,
- v.isEphemeral(), true))));
+ // Local means a private data set
+ if ("local".equals(uri.getHost())) {
+ map = new TreeMap<>();
+ sequentialIdGenerator = new AtomicLong();
} else {
- return FutureUtils.value(Optional.empty());
+ // Use a reference from a shared data set
+ String name = uri.getHost();
+ map = STATIC_MAPS.computeIfAbsent(name, __ -> new TreeMap<>());
+ sequentialIdGenerator = STATIC_ID_GEN_MAP.computeIfAbsent(name, __
-> new AtomicLong());
+ log.info("Created LocalMemoryDataStore for '{}'", name);
}
}
@Override
- public synchronized CompletableFuture<List<String>>
getChildrenFromStore(String path) {
- if (!isValidPath(path)) {
- return FutureUtils.exception(new MetadataStoreException(""));
- }
+ public CompletableFuture<Optional<GetResult>> get(String path) {
+ synchronized (map) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
- String firstKey = path.equals("/") ? path : path + "/";
- String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is
lexicographically just after '/'
+ Value v = map.get(path);
+ if (v != null) {
+ return FutureUtils.value(
+ Optional.of(
+ new GetResult(v.data, new Stat(path,
v.version, v.createdTimestamp, v.modifiedTimestamp,
+ v.isEphemeral(), true))));
+ } else {
+ return FutureUtils.value(Optional.empty());
+ }
+ }
+ }
- List<String> children = new ArrayList<>();
- map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
- String relativePath = key.replace(firstKey, "");
- if (!relativePath.contains("/")) {
- // Only return first-level children
- children.add(relativePath);
+ @Override
+ public CompletableFuture<List<String>> getChildrenFromStore(String path) {
+ synchronized (map) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
}
- });
- return FutureUtils.value(children);
+ String firstKey = path.equals("/") ? path : path + "/";
+ String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is
lexicographically just after '/'
+
+ List<String> children = new ArrayList<>();
+ map.subMap(firstKey, false, lastKey, false).forEach((key, value)
-> {
+ String relativePath = key.replace(firstKey, "");
+ if (!relativePath.contains("/")) {
+ // Only return first-level children
+ children.add(relativePath);
+ }
+ });
+
+ return FutureUtils.value(children);
+ }
}
@Override
- public synchronized CompletableFuture<Boolean> existsFromStore(String
path) {
- if (!isValidPath(path)) {
- return FutureUtils.exception(new MetadataStoreException(""));
- }
+ public CompletableFuture<Boolean> existsFromStore(String path) {
+ synchronized (map) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
- Value v = map.get(path);
- return FutureUtils.value(v != null ? true : false);
+ Value v = map.get(path);
+ return FutureUtils.value(v != null ? true : false);
+ }
}
@Override
@@ -113,79 +149,86 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
@Override
- public synchronized CompletableFuture<Stat> storePut(String path, byte[]
data, Optional<Long> optExpectedVersion,
- EnumSet<CreateOption> options) {
- if (!isValidPath(path)) {
- return FutureUtils.exception(new MetadataStoreException(""));
- }
+ public CompletableFuture<Stat> storePut(String path, byte[] data,
Optional<Long> optExpectedVersion,
+ EnumSet<CreateOption> options) {
+ synchronized (map) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
- boolean hasVersion = optExpectedVersion.isPresent();
- int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
+ boolean hasVersion = optExpectedVersion.isPresent();
+ int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
- if (options.contains(CreateOption.Sequential)) {
- path += Long.toString(sequentialIdGenerator.getAndIncrement());
- }
+ if (options.contains(CreateOption.Sequential)) {
+ path += Long.toString(sequentialIdGenerator.getAndIncrement());
+ }
- long now = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
- if (hasVersion && expectedVersion == -1) {
- Value newValue = new Value(0, data, now, now,
options.contains(CreateOption.Ephemeral));
- Value existingValue = map.putIfAbsent(path, newValue);
- if (existingValue != null) {
- return FutureUtils.exception(new BadVersionException(""));
- } else {
- receivedNotification(new
Notification(NotificationType.Created, path));
- String parent = parent(path);
- if (parent != null) {
- receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
- }
- return FutureUtils.value(new Stat(path, 0, now, now,
newValue.isEphemeral(), true));
- }
- } else {
- Value existingValue = map.get(path);
- long existingVersion = existingValue != null ?
existingValue.version : -1;
- if (hasVersion && expectedVersion != existingVersion) {
- return FutureUtils.exception(new BadVersionException(""));
- } else {
- long newVersion = existingValue != null ?
existingValue.version + 1 : 0;
- long createdTimestamp = existingValue != null ?
existingValue.createdTimestamp : now;
- Value newValue = new Value(newVersion, data, createdTimestamp,
now,
- options.contains(CreateOption.Ephemeral));
- map.put(path, newValue);
-
- NotificationType type = existingValue == null ?
NotificationType.Created : NotificationType.Modified;
- receivedNotification(new Notification(type, path));
- if (type == NotificationType.Created) {
+ if (hasVersion && expectedVersion == -1) {
+ Value newValue = new Value(0, data, now, now,
options.contains(CreateOption.Ephemeral));
+ Value existingValue = map.putIfAbsent(path, newValue);
+ if (existingValue != null) {
+ return FutureUtils.exception(new BadVersionException(""));
+ } else {
+ receivedNotification(new
Notification(NotificationType.Created, path));
String parent = parent(path);
if (parent != null) {
receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
}
+ return FutureUtils.value(new Stat(path, 0, now, now,
newValue.isEphemeral(), true));
+ }
+ } else {
+ Value existingValue = map.get(path);
+ long existingVersion = existingValue != null ?
existingValue.version : -1;
+ if (hasVersion && expectedVersion != existingVersion) {
+ return FutureUtils.exception(new BadVersionException(""));
+ } else {
+ long newVersion = existingValue != null ?
existingValue.version + 1 : 0;
+ long createdTimestamp = existingValue != null ?
existingValue.createdTimestamp : now;
+ Value newValue = new Value(newVersion, data,
createdTimestamp, now,
+ options.contains(CreateOption.Ephemeral));
+ map.put(path, newValue);
+
+ NotificationType type =
+ existingValue == null ? NotificationType.Created :
NotificationType.Modified;
+ receivedNotification(new Notification(type, path));
+ if (type == NotificationType.Created) {
+ String parent = parent(path);
+ if (parent != null) {
+ receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
+ }
+ }
+ return FutureUtils
+ .value(new Stat(path, newValue.version,
newValue.createdTimestamp,
+ newValue.modifiedTimestamp,
+ false, true));
}
- return FutureUtils
- .value(new Stat(path, newValue.version,
newValue.createdTimestamp, newValue.modifiedTimestamp, false, true));
}
}
}
@Override
- public synchronized CompletableFuture<Void> storeDelete(String path,
Optional<Long> optExpectedVersion) {
- if (!isValidPath(path)) {
- return FutureUtils.exception(new MetadataStoreException(""));
- }
+ public CompletableFuture<Void> storeDelete(String path, Optional<Long>
optExpectedVersion) {
+ synchronized (map) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
- Value value = map.get(path);
- if (value == null) {
- return FutureUtils.exception(new NotFoundException(""));
- } else if (optExpectedVersion.isPresent() && optExpectedVersion.get()
!= value.version) {
- return FutureUtils.exception(new BadVersionException(""));
- } else {
- map.remove(path);
- receivedNotification(new Notification(NotificationType.Deleted,
path));
- String parent = parent(path);
- if (parent != null) {
- receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
+ Value value = map.get(path);
+ if (value == null) {
+ return FutureUtils.exception(new NotFoundException(""));
+ } else if (optExpectedVersion.isPresent() &&
optExpectedVersion.get() != value.version) {
+ return FutureUtils.exception(new BadVersionException(""));
+ } else {
+ map.remove(path);
+ receivedNotification(new
Notification(NotificationType.Deleted, path));
+ String parent = parent(path);
+ if (parent != null) {
+ receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
+ }
+ return FutureUtils.value(null);
}
- return FutureUtils.value(null);
}
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 1911725..5cb6e5b 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertTrue;
+import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import org.apache.pulsar.tests.TestRetrySupport;
@@ -52,7 +53,7 @@ public abstract class BaseMetadataStoreTest extends
TestRetrySupport {
// Supplier<String> lambda is used for providing the value.
return new Object[][] {
{ "ZooKeeper", stringSupplier(() -> zks.getConnectionString())
},
- { "Memory", stringSupplier(() -> "memory://local") },
+ { "Memory", stringSupplier(() -> "memory://" +
UUID.randomUUID()) },
};
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java
new file mode 100644
index 0000000..0016d99
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LocalMemoryMetadataStoreTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+import org.testng.annotations.Test;
+
+public class LocalMemoryMetadataStoreTest {
+
+ @Test
+ public void testPrivateInstance() throws Exception {
+ @Cleanup
+ MetadataStore store1 = MetadataStoreFactory.create("memory://local",
+ MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ MetadataStore store2 = MetadataStoreFactory.create("memory://local",
+ MetadataStoreConfig.builder().build());
+
+ store1.put("/test", "value".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
+
+ assertTrue(store1.exists("/test").join());
+ assertFalse(store2.exists("/test").join());
+ }
+
+ @Test
+ public void testSharedInstance() throws Exception {
+ String url = "memory://" + UUID.randomUUID();
+
+ @Cleanup
+ MetadataStore store1 = MetadataStoreFactory.create(url,
+ MetadataStoreConfig.builder().build());
+
+ @Cleanup
+ MetadataStore store2 = MetadataStoreFactory.create(url,
+ MetadataStoreConfig.builder().build());
+
+ store1.put("/test", "value".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
+
+ assertTrue(store1.exists("/test").join());
+ assertTrue(store2.exists("/test").join());
+
+ store2.delete("/test", Optional.empty()).join();
+
+ assertFalse(store1.exists("/test").join());
+ assertFalse(store2.exists("/test").join());
+ }
+}