This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ac0e15d PIP-45: Add MetadataCache implementation (#9148)
ac0e15d is described below
commit ac0e15d24e2a40a20d6889bf477ae9bd52bb37eb
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jan 8 16:57:21 2021 -0800
PIP-45: Add MetadataCache implementation (#9148)
### Motivation
Third part of implementation for PIP-45:
* Added listener that can be added to the store
* Defined `MetadataCache` that acts as an object cache on top of the data
store
* Added `LocalMemoryMetadataStore` as a reference implementation that can
be used in tests.
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 2 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +-
.../bookkeeper/mledger/impl/MetaStoreImplTest.java | 2 +-
pulsar-metadata/pom.xml | 6 +
.../apache/pulsar/metadata/api/MetadataStore.java | 33 ++-
.../metadata/api/MetadataStoreException.java | 75 +++++-
.../pulsar/metadata/api/MetadataStoreFactory.java | 9 +-
...tadataStoreException.java => Notification.java} | 29 +--
...taStoreException.java => NotificationType.java} | 33 +--
.../pulsar/metadata/cache/MetadataCache.java | 133 +++++++++++
.../impl/JSONMetadataSerdeSimpleType.java} | 37 ++-
.../impl/JSONMetadataSerdeTypeRef.java} | 37 ++-
.../metadata/cache/impl/MetadataCacheImpl.java | 246 +++++++++++++++++++
.../impl/MetadataSerde.java} | 29 +--
.../metadata/impl/AbstractMetadataStore.java | 162 +++++++++++++
.../metadata/impl/LocalMemoryMetadataStore.java | 192 +++++++++++++++
.../impl/{zookeeper => }/ZKMetadataStore.java | 94 ++++++--
.../pulsar/metadata/BaseMetadataStoreTest.java | 61 +++++
.../apache/pulsar/metadata/MetadataCacheTest.java | 261 +++++++++++++++++++++
.../apache/pulsar/metadata/MetadataStoreTest.java | 125 +++++++---
20 files changed, 1384 insertions(+), 184 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 7efb471..ab28046 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -82,7 +82,7 @@ import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4406df6..18bd70c 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -109,7 +109,7 @@ import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
index e41c550..98a0433 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
@@ -32,7 +32,7 @@ import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index fbcd511..dd9c668 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -34,6 +34,12 @@
<dependencies>
<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 16b0496..3cafa17 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -18,14 +18,17 @@
*/
package org.apache.pulsar.metadata.api;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.Beta;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.cache.MetadataCache;
/**
* Metadata store client interface.
@@ -56,7 +59,7 @@ public interface MetadataStore extends AutoCloseable {
* If the path itself does not exist, it will return an empty list.
*
* @param path
- * webSocketProxyEnabled
+ * the path of the key to get from the store
* @return a future to track the async request
*/
CompletableFuture<List<String>> getChildren(String path);
@@ -109,4 +112,32 @@ public interface MetadataStore extends AutoCloseable {
* @return a future to track the async request
*/
CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion);
+
+ /**
+ * Register a listener that will be called on changes in the underlying
store.
+ *
+ * @param listener
+ * a consumer of notifications
+ */
+ void registerListener(Consumer<Notification> listener);
+
+ /**
+ * Create a metadata cache specialized for a specific class.
+ *
+ * @param <T>
+ * @param clazz
+ * the class type to be used for serialization/deserialization
+ * @return the metadata cache object
+ */
+ <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+
+ /**
+ * Create a metadata cache specialized for a specific class.
+ *
+ * @param <T>
+ * @param typeRef
+ * the type ref description to be used for
serialization/deserialization
+ * @return the metadata cache object
+ */
+ <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
index ab981df..c5fdb27 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.metadata.api;
import java.io.IOException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
/**
* Generic metadata store exception.
@@ -29,13 +31,38 @@ public class MetadataStoreException extends IOException {
super(t);
}
+ public MetadataStoreException(String msg) {
+ super(msg);
+ }
+
/**
- * Value not found in store.
+ * Key not found in store.
*/
public static class NotFoundException extends MetadataStoreException {
+ public NotFoundException() {
+ super((Throwable)null);
+ }
+
public NotFoundException(Throwable t) {
super(t);
}
+
+ public NotFoundException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * Key was already in store.
+ */
+ public static class AlreadyExistsException extends MetadataStoreException {
+ public AlreadyExistsException(Throwable t) {
+ super(t);
+ }
+
+ public AlreadyExistsException(String msg) {
+ super(msg);
+ }
}
/**
@@ -45,5 +72,51 @@ public class MetadataStoreException extends IOException {
public BadVersionException(Throwable t) {
super(t);
}
+
+ public BadVersionException(String msg) {
+ super(msg);
+ }
+ }
+
+ /**
+ * Failed to de-serialize the metadata.
+ */
+ public static class ContentDeserializationException extends
MetadataStoreException {
+ public ContentDeserializationException(Throwable t) {
+ super(t);
+ }
+
+ public ContentDeserializationException(String msg) {
+ super(msg);
+ }
+ }
+
+ public static MetadataStoreException unwrap(Throwable t) {
+ if (t instanceof MetadataStoreException) {
+ return (MetadataStoreException) t;
+ } else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (t instanceof InterruptedException) {
+ return new MetadataStoreException(t);
+ } else if (!(t instanceof ExecutionException) && !(t instanceof
CompletionException)) {
+ // Generic exception
+ return new MetadataStoreException(t);
+ }
+
+ // Unwrap the exception to keep the same exception type but a stack
trace that includes the application calling
+ // site
+ Throwable cause = t.getCause();
+ String msg = cause.getMessage();
+ if (cause instanceof NotFoundException) {
+ return new NotFoundException(msg);
+ } else if (cause instanceof AlreadyExistsException) {
+ return new AlreadyExistsException(msg);
+ } else if (cause instanceof BadVersionException) {
+ return new BadVersionException(msg);
+ } else if (cause instanceof ContentDeserializationException) {
+ return new ContentDeserializationException(msg);
+ } else {
+ return new MetadataStoreException(t);
+ }
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
index d44b666..efd238e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
@@ -22,7 +22,8 @@ import java.io.IOException;
import lombok.experimental.UtilityClass;
-import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
/**
* Factory class for {@link MetadataStore}.
@@ -41,6 +42,10 @@ public class MetadataStoreFactory {
* if the metadata store initialization fails
*/
public static MetadataStore create(String metadataURL, MetadataStoreConfig
metadataStoreConfig) throws IOException {
- return new ZKMetadataStore(metadataURL, metadataStoreConfig);
+ if (metadataURL.startsWith("memory://")) {
+ return new LocalMemoryMetadataStore(metadataURL,
metadataStoreConfig);
+ } else {
+ return new ZKMetadataStore(metadataURL, metadataStoreConfig);
+ }
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Notification.java
similarity index 58%
copy from
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Notification.java
index ab981df..d27bfd4 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Notification.java
@@ -18,32 +18,17 @@
*/
package org.apache.pulsar.metadata.api;
-import java.io.IOException;
-
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
-
- public MetadataStoreException(Throwable t) {
- super(t);
- }
+import lombok.Data;
+@Data
+public final class Notification {
/**
- * Value not found in store.
+ * The type of the event being notified.
*/
- public static class NotFoundException extends MetadataStoreException {
- public NotFoundException(Throwable t) {
- super(t);
- }
- }
+ private final NotificationType type;
/**
- * Unsuccessful update due to mismatched expected version.
+ * Path of the kev/value pair interested by the notification
*/
- public static class BadVersionException extends MetadataStoreException {
- public BadVersionException(Throwable t) {
- super(t);
- }
- }
+ private final String path;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/NotificationType.java
similarity index 56%
copy from
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/NotificationType.java
index ab981df..bbc1dfd 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/NotificationType.java
@@ -18,32 +18,9 @@
*/
package org.apache.pulsar.metadata.api;
-import java.io.IOException;
-
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
-
- public MetadataStoreException(Throwable t) {
- super(t);
- }
-
- /**
- * Value not found in store.
- */
- public static class NotFoundException extends MetadataStoreException {
- public NotFoundException(Throwable t) {
- super(t);
- }
- }
-
- /**
- * Unsuccessful update due to mismatched expected version.
- */
- public static class BadVersionException extends MetadataStoreException {
- public BadVersionException(Throwable t) {
- super(t);
- }
- }
+public enum NotificationType {
+ Created,
+ Modified,
+ ChildrenChanged,
+ Deleted
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/MetadataCache.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/MetadataCache.java
new file mode 100644
index 0000000..17aa6b3
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/MetadataCache.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.cache;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+
+/**
+ * Represent the caching layer access for a specific type of objects.
+ */
+public interface MetadataCache<T> {
+
+ /**
+ * Tries to fetch one item from the cache or fallback to the store if not
present.
+ * <p>
+ * If the key is not found, the {@link Optional} will be empty.
+ *
+ * @param path
+ * the path of the object in the metadata store
+ * @return a future to track the completion of the operation
+ */
+ CompletableFuture<Optional<T>> get(String path);
+
+ /**
+ * Check if an object is present in cache without triggering a load from
the metadata store.
+ *
+ * @param path
+ * the path of the object in the metadata store
+ * @return the cached object or an empty {@link Optional} is the cache
doesn't have the object
+ */
+ Optional<T> getIfCached(String path);
+
+ /**
+ * Return all the nodes (lexicographically sorted) that are children to
the specific path.
+ *
+ * If the path itself does not exist, it will return an empty list.
+ *
+ * @param path
+ * the path of the key to get from the store
+ * @return a future to track the async request
+ */
+ CompletableFuture<List<String>> getChildren(String path);
+
+ /**
+ * Read whether a specific path exists.
+ *
+ * Note: In case of keys with multiple levels (eg: '/a/b/c'), checking the
existence of a parent (eg. '/a') might
+ * not necessarily return true, unless the key had been explicitly created.
+ *
+ * @param path
+ * the path of the key to check on the store
+ * @return a future to track the async request
+ */
+ CompletableFuture<Boolean> exists(String path);
+
+ /**
+ * Perform an atomic read-modify-update of the value.
+ * <p>
+ * The modify function can potentially be called multiple times if there
are concurrent updates happening.
+ * <p>
+ * If the object does not exist yet, the <code>modifyFunction</code> will
get passed an {@link Optional#empty()}
+ * object.
+ *
+ * @param path
+ * the path of the object in the metadata store
+ * @param modifyFunction
+ * a function that will be passed the current value and returns
a modified value to be stored
+ * @return a future to track the completion of the operation
+ */
+ CompletableFuture<Void> readModifyUpdateOrCreate(String path,
Function<Optional<T>, T> modifyFunction);
+
+ /**
+ * Perform an atomic read-modify-update of the value.
+ * <p>
+ * The modify function can potentially be called multiple times if there
are concurrent updates happening.
+ *
+ * @param path
+ * the path of the object in the metadata store
+ * @param modifyFunction
+ * a function that will be passed the current value and returns
a modified value to be stored
+ * @return a future to track the completion of the operation
+ */
+ CompletableFuture<Void> readModifyUpdate(String path, Function<T, T>
modifyFunction);
+
+ /**
+ * Create a new object in the metadata store.
+ * <p>
+ * This operation will make sure to keep the cache consistent.
+ *
+ * @param path
+ * the path of the object in the metadata store
+ * @param value
+ * the object to insert in metadata store
+ * @return a future to track the completion of the operation
+ * @throws AlreadyExistsException
+ * If the object is already present.
+ */
+ CompletableFuture<Void> create(String path, T value);
+
+ /**
+ * Delete an object from the metadata store.
+ * <p>
+ * This operation will make sure to keep the cache consistent.
+ *
+ * @param path
+ * the path of the object in the metadata store
+ * @return a future to track the completion of the operation
+ * @throws NotFoundException
+ * if the object is not present in the metadata store.
+ */
+ CompletableFuture<Void> delete(String path);
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
similarity index 56%
copy from
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
index ab981df..d3f071b 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
@@ -16,34 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.metadata.api;
+package org.apache.pulsar.metadata.cache.impl;
+
+import com.fasterxml.jackson.databind.JavaType;
import java.io.IOException;
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+public class JSONMetadataSerdeSimpleType<T> implements MetadataSerde<T> {
+
+ private final JavaType typeRef;
- public MetadataStoreException(Throwable t) {
- super(t);
+ public JSONMetadataSerdeSimpleType(JavaType typeRef) {
+ this.typeRef = typeRef;
}
- /**
- * Value not found in store.
- */
- public static class NotFoundException extends MetadataStoreException {
- public NotFoundException(Throwable t) {
- super(t);
- }
+ @Override
+ public byte[] serialize(T value) throws IOException {
+ return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value);
}
- /**
- * Unsuccessful update due to mismatched expected version.
- */
- public static class BadVersionException extends MetadataStoreException {
- public BadVersionException(Throwable t) {
- super(t);
- }
+ @Override
+ public T deserialize(byte[] content) throws IOException {
+ return ObjectMapperFactory.getThreadLocal().readValue(content,
typeRef);
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeTypeRef.java
similarity index 55%
copy from
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeTypeRef.java
index ab981df..4b37898 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeTypeRef.java
@@ -16,34 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.metadata.api;
+package org.apache.pulsar.metadata.cache.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+public class JSONMetadataSerdeTypeRef<T> implements MetadataSerde<T> {
+
+ private final TypeReference<T> typeRef;
- public MetadataStoreException(Throwable t) {
- super(t);
+ public JSONMetadataSerdeTypeRef(TypeReference<T> typeRef) {
+ this.typeRef = typeRef;
}
- /**
- * Value not found in store.
- */
- public static class NotFoundException extends MetadataStoreException {
- public NotFoundException(Throwable t) {
- super(t);
- }
+ @Override
+ public byte[] serialize(T value) throws IOException {
+ return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value);
}
- /**
- * Unsuccessful update due to mismatched expected version.
- */
- public static class BadVersionException extends MetadataStoreException {
- public BadVersionException(Throwable t) {
- super(t);
- }
+ @Override
+ public T deserialize(byte[] content) throws IOException {
+ return ObjectMapperFactory.getThreadLocal().readValue(content,
typeRef);
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
new file mode 100644
index 0000000..958036a
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.cache.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.cache.MetadataCache;
+
+public class MetadataCacheImpl<T> implements MetadataCache<T>,
Consumer<Notification> {
+
+ private static final long CACHE_REFRESH_TIME_MILLIS =
TimeUnit.MINUTES.toMillis(5);
+
+ private final MetadataStore store;
+ private final MetadataSerde<T> serde;
+
+ private final AsyncLoadingCache<String, Optional<Entry<T, Stat>>> objCache;
+
+ public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef) {
+ this(store, new JSONMetadataSerdeTypeRef<>(typeRef));
+ }
+
+ public MetadataCacheImpl(MetadataStore store, JavaType type) {
+ this(store, new JSONMetadataSerdeSimpleType<>(type));
+ }
+
+ private MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde) {
+ this.store = store;
+ this.serde = serde;
+
+ this.objCache = Caffeine.newBuilder()
+ .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS,
TimeUnit.MILLISECONDS)
+ .buildAsync(new AsyncCacheLoader<String, Optional<Entry<T,
Stat>>>() {
+ @Override
+ public CompletableFuture<Optional<Entry<T, Stat>>>
asyncLoad(String key, Executor executor) {
+ return readValueFromStore(key);
+ }
+
+ @Override
+ public CompletableFuture<Optional<Entry<T, Stat>>>
asyncReload(String key,
+ Optional<Entry<T, Stat>> oldValue, Executor
executor) {
+ return readValueFromStore(key);
+ }
+ });
+ }
+
+ private CompletableFuture<Optional<Entry<T, Stat>>>
readValueFromStore(String path) {
+ return store.get(path)
+ .thenCompose(optRes -> {
+ if (!optRes.isPresent()) {
+ return FutureUtils.value(Optional.empty());
+ }
+
+ try {
+ T obj = serde.deserialize(optRes.get().getValue());
+ return FutureUtils
+ .value(Optional.of(new SimpleImmutableEntry<T,
Stat>(obj, optRes.get().getStat())));
+ } catch (Throwable t) {
+ return FutureUtils.exception(new
ContentDeserializationException(t));
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Optional<T>> get(String path) {
+ return objCache.get(path)
+ .thenApply(optRes -> optRes.map(Entry::getKey));
+ }
+
+ @Override
+ public Optional<T> getIfCached(String path) {
+ CompletableFuture<Optional<Map.Entry<T, Stat>>> future =
objCache.getIfPresent(path);
+ if (future != null && future.isDone() &&
!future.isCompletedExceptionally()) {
+ return future.join().map(Entry::getKey);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> readModifyUpdateOrCreate(String path,
Function<Optional<T>, T> modifyFunction) {
+ return objCache.get(path)
+ .thenCompose(optEntry -> {
+ Optional<T> currentValue;
+ long expectedVersion;
+
+ if (optEntry.isPresent()) {
+ currentValue = Optional.of(optEntry.get().getKey());
+ expectedVersion =
optEntry.get().getValue().getVersion();
+ } else {
+ currentValue = Optional.empty();
+ expectedVersion = -1;
+ }
+
+ T newValueObj;
+ byte[] newValue;
+ try {
+ newValueObj = modifyFunction.apply(currentValue);
+ newValue = serde.serialize(newValueObj);
+ } catch (Throwable t) {
+ return FutureUtils.exception(t);
+ }
+
+ return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(stat -> {
+ // Make sure we have the value cached before the
operation is completed
+ objCache.put(path,
+ FutureUtils.value(Optional.of(new
SimpleImmutableEntry<T, Stat>(newValueObj, stat))));
+ });
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> readModifyUpdate(String path, Function<T,
T> modifyFunction) {
+ return objCache.get(path)
+ .thenCompose(optEntry -> {
+ if (!optEntry.isPresent()) {
+ return FutureUtils.exception(new
NotFoundException(""));
+ }
+
+ Map.Entry<T, Stat> entry = optEntry.get();
+ T currentValue = entry.getKey();
+ long expectedVersion =
optEntry.get().getValue().getVersion();
+
+ T newValueObj;
+ byte[] newValue;
+ try {
+ newValueObj = modifyFunction.apply(currentValue);
+ newValue = serde.serialize(newValueObj);
+ } catch (Throwable t) {
+ return FutureUtils.exception(t);
+ }
+
+ return store.put(path, newValue,
Optional.of(expectedVersion)).thenAccept(stat -> {
+ // Make sure we have the value cached before the
operation is completed
+ objCache.put(path,
+ FutureUtils.value(Optional.of(new
SimpleImmutableEntry<T, Stat>(newValueObj, stat))));
+ });
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> create(String path, T value) {
+ byte[] content;
+ try {
+ content = serde.serialize(value);
+ } catch (Throwable t) {
+ return FutureUtils.exception(t);
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ store.put(path, content, Optional.of(-1L))
+ .thenAccept(stat -> {
+ // Make sure we have the value cached before the operation
is completed
+ objCache.put(path, FutureUtils.value(Optional.of(new
SimpleImmutableEntry<T, Stat>(value, stat))));
+ future.complete(null);
+ }).exceptionally(ex -> {
+ if (ex.getCause() instanceof BadVersionException) {
+ // Use already exists exception to provide more
self-explanatory error message
+ future.completeExceptionally(new
AlreadyExistsException(ex.getCause()));
+ } else {
+ future.completeExceptionally(ex.getCause());
+ }
+ return null;
+ });
+
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> delete(String path) {
+ return store.delete(path, Optional.empty())
+ .thenAccept(v -> {
+ // Mark in the cache that the object was removed
+ objCache.put(path, FutureUtils.value(Optional.empty()));
+ });
+ }
+
+ @Override
+ public CompletableFuture<Boolean> exists(String path) {
+ return store.exists(path);
+ }
+
+ @Override
+ public CompletableFuture<List<String>> getChildren(String path) {
+ return store.getChildren(path);
+ }
+
+ @Override
+ public void accept(Notification t) {
+ String path = t.getPath();
+ switch (t.getType()) {
+ case Created:
+ case Modified:
+ if (objCache.synchronous().getIfPresent(path) != null) {
+ // Trigger background refresh of the cached item
+ objCache.synchronous().refresh(path);
+ }
+ break;
+
+ case Deleted:
+ objCache.synchronous().invalidate(path);
+ break;
+
+ default:
+ break;
+ }
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataSerde.java
similarity index 55%
copy from
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataSerde.java
index ab981df..a1a3b03 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataSerde.java
@@ -16,34 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.metadata.api;
+package org.apache.pulsar.metadata.cache.impl;
import java.io.IOException;
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
-
- public MetadataStoreException(Throwable t) {
- super(t);
- }
+public interface MetadataSerde<T> {
- /**
- * Value not found in store.
- */
- public static class NotFoundException extends MetadataStoreException {
- public NotFoundException(Throwable t) {
- super(t);
- }
- }
+ byte[] serialize(T value) throws IOException;
- /**
- * Unsuccessful update due to mismatched expected version.
- */
- public static class BadVersionException extends MetadataStoreException {
- public BadVersionException(Throwable t) {
- super(t);
- }
- }
+ T deserialize(byte[] content) throws IOException;
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
new file mode 100644
index 0000000..f6979d5
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.cache.MetadataCache;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
+
+@Slf4j
+public abstract class AbstractMetadataStore implements MetadataStore,
Consumer<Notification> {
+
+ private static final long CACHE_REFRESH_TIME_MILLIS =
TimeUnit.MINUTES.toMillis(5);
+
+ private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new
CopyOnWriteArrayList<>();
+ protected final ExecutorService executor;
+ private final AsyncLoadingCache<String, List<String>> childrenCache;
+ private final AsyncLoadingCache<String, Boolean> existsCache;
+ private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches =
new CopyOnWriteArrayList<>();
+
+ protected abstract CompletableFuture<List<String>>
getChildrenFromStore(String path);
+
+ protected abstract CompletableFuture<Boolean> existsFromStore(String path);
+
+ protected AbstractMetadataStore() {
+ this.executor = Executors
+ .newSingleThreadExecutor(new
DefaultThreadFactory("metadata-store"));
+ registerListener(this);
+
+ this.childrenCache = Caffeine.newBuilder()
+ .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS,
TimeUnit.MILLISECONDS)
+ .buildAsync(new AsyncCacheLoader<String, List<String>>() {
+ @Override
+ public CompletableFuture<List<String>> asyncLoad(String
key, Executor executor) {
+ return getChildrenFromStore(key);
+ }
+
+ @Override
+ public CompletableFuture<List<String>> asyncReload(String
key, List<String> oldValue,
+ Executor executor) {
+ return getChildrenFromStore(key);
+ }
+ });
+
+ this.existsCache = Caffeine.newBuilder()
+ .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS,
TimeUnit.MILLISECONDS)
+ .buildAsync(new AsyncCacheLoader<String, Boolean>() {
+ @Override
+ public CompletableFuture<Boolean> asyncLoad(String key,
Executor executor) {
+ return existsFromStore(key);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> asyncReload(String key,
Boolean oldValue,
+ Executor executor) {
+ return existsFromStore(key);
+ }
+ });
+ }
+
+ @Override
+ public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+ MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
+ TypeFactory.defaultInstance().constructSimpleType(clazz,
null));
+ metadataCaches.add(metadataCache);
+ return metadataCache;
+ }
+
+ @Override
+ public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+ MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
typeRef);
+ metadataCaches.add(metadataCache);
+ return metadataCache;
+ }
+
+ @Override
+ public final CompletableFuture<List<String>> getChildren(String path) {
+ return childrenCache.get(path);
+ }
+
+ @Override
+ public final CompletableFuture<Boolean> exists(String path) {
+ return existsCache.get(path);
+ }
+
+ @Override
+ public void registerListener(Consumer<Notification> listener) {
+ listeners.add(listener);
+ }
+
+ protected void receivedNotification(Notification notification) {
+ executor.execute(() -> {
+ listeners.forEach(listener -> {
+ try {
+ listener.accept(notification);
+ } catch (Throwable t) {
+ log.error("Failed to process metadata store notification",
t);
+ }
+ });
+ });
+ }
+
+ @Override
+ public void accept(Notification n) {
+ String path = n.getPath();
+ NotificationType type = n.getType();
+
+ if (type == NotificationType.Created || type ==
NotificationType.Deleted) {
+ existsCache.synchronous().invalidate(path);
+ }
+
+ if (type == NotificationType.ChildrenChanged) {
+ childrenCache.synchronous().invalidate(path);
+ }
+
+ if (type == NotificationType.Created || type ==
NotificationType.Deleted || type == NotificationType.Modified) {
+ metadataCaches.forEach(c -> c.accept(n));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ executor.shutdownNow();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
new file mode 100644
index 0000000..83fdb31
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+import lombok.Data;
+
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+
+public class LocalMemoryMetadataStore extends AbstractMetadataStore implements
MetadataStore {
+
+ @Data
+ private static class Value {
+ final long version;
+ final byte[] data;
+ final long createdTimestamp;
+ final long modifiedTimestamp;
+ }
+
+ private final NavigableMap<String, Value> map;
+
+ public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig) throws IOException {
+ map = new TreeMap<>();
+ }
+
+ @Override
+ public synchronized CompletableFuture<Optional<GetResult>> get(String
path) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
+
+ Value v = map.get(path);
+ if (v != null) {
+ return FutureUtils.value(
+ Optional.of(new GetResult(v.data, new Stat(v.version,
v.createdTimestamp, v.modifiedTimestamp))));
+ } else {
+ return FutureUtils.value(Optional.empty());
+ }
+ }
+
+ @Override
+ public synchronized CompletableFuture<List<String>>
getChildrenFromStore(String path) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
+
+ String firstKey = path.equals("/") ? path : path + "/";
+ String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is
lexicographically just after '/'
+
+ List<String> children = new ArrayList<>();
+ map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
+ String relativePath = key.replace(firstKey, "");
+ if (!relativePath.contains("/")) {
+ // Only return first-level children
+ children.add(relativePath);
+ }
+ });
+
+ return FutureUtils.value(children);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Boolean> existsFromStore(String
path) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
+
+ Value v = map.get(path);
+ return FutureUtils.value(v != null ? true : false);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Stat> put(String path, byte[] data,
Optional<Long> optExpectedVersion) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
+
+ boolean hasVersion = optExpectedVersion.isPresent();
+ int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
+
+ long now = System.currentTimeMillis();
+
+ if (hasVersion && expectedVersion == -1) {
+ Value newValue = new Value(0, data, now, now);
+ Value existingValue = map.putIfAbsent(path, newValue);
+ if (existingValue != null) {
+ return FutureUtils.exception(new BadVersionException(""));
+ } else {
+ receivedNotification(new
Notification(NotificationType.Created, path));
+ String parent = parent(path);
+ if (parent != null) {
+ receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
+ }
+ return FutureUtils.value(new Stat(0, now, now));
+ }
+ } else {
+ Value existingValue = map.get(path);
+ long existingVersion = existingValue != null ?
existingValue.version : -1;
+ if (hasVersion && expectedVersion != existingVersion) {
+ return FutureUtils.exception(new BadVersionException(""));
+ } else {
+ long newVersion = existingValue != null ?
existingValue.version + 1 : 0;
+ long createdTimestamp = existingValue != null ?
existingValue.createdTimestamp : now;
+ Value newValue = new Value(newVersion, data, createdTimestamp,
now);
+ map.put(path, newValue);
+
+ NotificationType type = existingValue == null ?
NotificationType.Created : NotificationType.Modified;
+ receivedNotification(new Notification(type, path));
+ if (type == NotificationType.Created) {
+ String parent = parent(path);
+ if (parent != null) {
+ receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
+ }
+ }
+ return FutureUtils
+ .value(new Stat(newValue.version,
newValue.createdTimestamp, newValue.modifiedTimestamp));
+ }
+ }
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> delete(String path,
Optional<Long> optExpectedVersion) {
+ if (!isValidPath(path)) {
+ return FutureUtils.exception(new MetadataStoreException(""));
+ }
+
+ Value value = map.get(path);
+ if (value == null) {
+ return FutureUtils.exception(new NotFoundException(""));
+ } else if (value != null && optExpectedVersion.isPresent() &&
optExpectedVersion.get() != value.version) {
+ return FutureUtils.exception(new BadVersionException(""));
+ } else {
+ map.remove(path);
+ receivedNotification(new Notification(NotificationType.Deleted,
path));
+ String parent = parent(path);
+ if (parent != null) {
+ receivedNotification(new
Notification(NotificationType.ChildrenChanged, parent));
+ }
+ return FutureUtils.value(null);
+ }
+ }
+
+ private static boolean isValidPath(String path) {
+ if (path == null || !path.startsWith("/")) {
+ return false;
+ }
+
+ return path != "/" || !path.endsWith("/");
+ }
+
+ private static String parent(String path) {
+ int idx = path.lastIndexOf('/');
+ if (idx <= 0) {
+ // No parent
+ return null;
+ }
+
+ return path.substring(0, idx);
+ }
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
similarity index 78%
rename from
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
rename to
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index e35b583..4b8135b 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -16,15 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.metadata.impl.zookeeper;
+package org.apache.pulsar.metadata.impl;
+
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
@@ -35,22 +37,22 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-public class ZKMetadataStore implements MetadataStore {
+@Slf4j
+public class ZKMetadataStore extends AbstractMetadataStore implements
MetadataStore, Watcher {
private final boolean isZkManaged;
private final ZooKeeper zkc;
- private final ExecutorService executor;
public ZKMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig) throws IOException {
try {
@@ -62,15 +64,12 @@ public class ZKMetadataStore implements MetadataStore {
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
-
- this.executor = Executors.newSingleThreadExecutor(new
DefaultThreadFactory("zk-metadata-store-callback"));
}
@VisibleForTesting
public ZKMetadataStore(ZooKeeper zkc) {
this.isZkManaged = false;
this.zkc = zkc;
- this.executor = Executors.newSingleThreadExecutor(new
DefaultThreadFactory("zk-metadata-store-callback"));
}
@Override
@@ -78,12 +77,29 @@ public class ZKMetadataStore implements MetadataStore {
CompletableFuture<Optional<GetResult>> future = new
CompletableFuture<>();
try {
- zkc.getData(path, null, (rc, path1, ctx, data, stat) -> {
+ zkc.getData(path, this, (rc, path1, ctx, data, stat) -> {
executor.execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(Optional.of(new GetResult(data,
getStat(stat))));
} else if (code == Code.NONODE) {
+ // Place a watch on the non-existing node, so we'll
get notified
+ // when it gets created and we can invalidate the
negative cache.
+ existsFromStore(path).thenAccept(exists -> {
+ if (exists) {
+ get(path).thenAccept(c -> future.complete(c))
+ .exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ // Z-node does not exist
+ future.complete(Optional.empty());
+ }
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
future.complete(Optional.empty());
} else {
future.completeExceptionally(getException(code, path));
@@ -98,11 +114,11 @@ public class ZKMetadataStore implements MetadataStore {
}
@Override
- public CompletableFuture<List<String>> getChildren(String path) {
+ public CompletableFuture<List<String>> getChildrenFromStore(String path) {
CompletableFuture<List<String>> future = new CompletableFuture<>();
try {
- zkc.getChildren(path, null, (rc, path1, ctx, children) -> {
+ zkc.getChildren(path, this, (rc, path1, ctx, children) -> {
executor.execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
@@ -114,9 +130,9 @@ public class ZKMetadataStore implements MetadataStore {
// been created after the call to getChildren, but
before the call to exists().
// If this is the case, exists will return true, and
we just call getChildren
// again.
- exists(path).thenAccept(exists -> {
+ existsFromStore(path).thenAccept(exists -> {
if (exists) {
- getChildren(path).thenAccept(c ->
future.complete(c)).exceptionally(ex -> {
+ getChildrenFromStore(path).thenAccept(c ->
future.complete(c)).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
@@ -128,8 +144,6 @@ public class ZKMetadataStore implements MetadataStore {
future.completeExceptionally(ex);
return null;
});
-
- future.complete(Collections.emptyList());
} else {
future.completeExceptionally(getException(code, path));
}
@@ -143,11 +157,11 @@ public class ZKMetadataStore implements MetadataStore {
}
@Override
- public CompletableFuture<Boolean> exists(String path) {
+ public CompletableFuture<Boolean> existsFromStore(String path) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
try {
- zkc.exists(path, null, (rc, path1, ctx, stat) -> {
+ zkc.exists(path, this, (rc, path1, ctx, stat) -> {
executor.execute(() -> {
Code code = Code.get(rc);
if (code == Code.OK) {
@@ -250,7 +264,7 @@ public class ZKMetadataStore implements MetadataStore {
if (isZkManaged) {
zkc.close();
}
- executor.shutdownNow();
+ super.close();
}
private static Stat getStat(org.apache.zookeeper.data.Stat zkStat) {
@@ -269,4 +283,40 @@ public class ZKMetadataStore implements MetadataStore {
return new MetadataStoreException(ex);
}
}
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received ZK watch : {}", event);
+ }
+ String path = event.getPath();
+ if (path == null) {
+ // Ignore Session events
+ return;
+ }
+
+ NotificationType type;
+ switch (event.getType()) {
+ case NodeCreated:
+ type = NotificationType.Created;
+ break;
+
+ case NodeDataChanged:
+ type = NotificationType.Modified;
+ break;
+
+ case NodeChildrenChanged:
+ type = NotificationType.ChildrenChanged;
+ break;
+
+ case NodeDeleted:
+ type = NotificationType.Deleted;
+ break;
+
+ default:
+ return;
+ }
+
+ receivedNotification(new Notification(type, event.getPath()));
+ }
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
new file mode 100644
index 0000000..7319c20
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.CompletionException;
+
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+
+public abstract class BaseMetadataStoreTest {
+ protected TestZKServer zks;
+
+ @BeforeClass
+ void setup() throws Exception {
+ zks = new TestZKServer();
+ }
+
+ @AfterClass
+ void teardown() throws Exception {
+ zks.close();
+ }
+
+ @DataProvider(name = "impl")
+ public Object[][] implementations() {
+ return new Object[][] {
+ { "ZooKeeper", zks.getConnectionString() },
+ { "Memory", "memory://local" },
+ };
+ }
+
+ protected String newKey() {
+ return "/key-" + System.nanoTime();
+ }
+
+ static void assertException(CompletionException e, Class<?> clazz) {
+ assertException(e.getCause(), clazz);
+ }
+
+ static void assertException(Throwable t, Class<?> clazz) {
+ assertTrue(clazz.isInstance(t), String.format("Exception %s is not of
type %s", t.getClass(), clazz));
+ }
+}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
new file mode 100644
index 0000000..44de394
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletionException;
+
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.cache.MetadataCache;
+import org.testng.annotations.Test;
+
+public class MetadataCacheTest extends BaseMetadataStoreTest {
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ static class MyClass {
+ String a;
+ int b;
+ }
+
+ @Test(dataProvider = "impl")
+ public void emptyCacheTest(String provider, String url) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ MetadataCache<MyClass> objCache =
store.getMetadataCache(MyClass.class);
+
+ assertEquals(objCache.getIfCached("/non-existing-key"),
Optional.empty());
+ assertEquals(objCache.getIfCached("/non-existing-key/child"),
Optional.empty());
+
+ assertEquals(objCache.get("/non-existing-key").join(),
Optional.empty());
+ assertEquals(objCache.get("/non-existing-key/child").join(),
Optional.empty());
+
+ try {
+ objCache.delete("/non-existing-key").join();
+ fail("should have failed");
+ } catch (CompletionException e) {
+ assertEquals(e.getCause().getClass(), NotFoundException.class);
+ }
+
+ try {
+ objCache.delete("/non-existing-key/child").join();
+ fail("should have failed");
+ } catch (CompletionException e) {
+ assertEquals(e.getCause().getClass(), NotFoundException.class);
+ }
+ }
+
+ @Test(dataProvider = "impl")
+ public void insertionDeletionWitGenericType(String provider, String url)
throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ MetadataCache<Map<String, String>> objCache = store
+ .getMetadataCache(new TypeReference<Map<String, String>>() {
+ });
+
+ String key1 = newKey();
+
+ assertEquals(objCache.getIfCached(key1), Optional.empty());
+
+ Map<String, String> v = new TreeMap<>();
+ v.put("a", "1");
+ v.put("b", "2");
+ objCache.create(key1, v).join();
+
+ assertEquals(objCache.getIfCached(key1), Optional.of(v));
+ assertEquals(objCache.get(key1).join(), Optional.of(v));
+
+ objCache.delete(key1).join();
+
+ assertEquals(objCache.getIfCached(key1), Optional.empty());
+ assertEquals(objCache.get(key1).join(), Optional.empty());
+ }
+
+ @Test(dataProvider = "impl")
+ public void insertionDeletion(String provider, String url) throws
Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+ MetadataCache<MyClass> objCache =
store.getMetadataCache(MyClass.class);
+
+ String key1 = newKey();
+
+ assertEquals(objCache.getIfCached(key1), Optional.empty());
+
+ MyClass value1 = new MyClass("a", 1);
+ objCache.create(key1, value1).join();
+
+ MyClass value2 = new MyClass("a", 2);
+
+ try {
+ objCache.create(key1, value2).join();
+ fail("should have failed to create");
+ } catch (CompletionException e) {
+ assertEquals(e.getCause().getClass(),
AlreadyExistsException.class);
+ }
+
+ assertEquals(objCache.getIfCached(key1), Optional.of(value1));
+ assertEquals(objCache.get(key1).join(), Optional.of(value1));
+
+ objCache.delete(key1).join();
+
+ assertEquals(objCache.getIfCached(key1), Optional.empty());
+ assertEquals(objCache.get(key1).join(), Optional.empty());
+ }
+
+ @Test(dataProvider = "impl")
+ public void insertionOutsideCache(String provider, String url) throws
Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+ MetadataCache<MyClass> objCache =
store.getMetadataCache(MyClass.class);
+
+ String key1 = newKey();
+
+ MyClass value1 = new MyClass("a", 1);
+ store.put(key1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1),
Optional.of(-1L)).join();
+
+ assertEquals(objCache.getIfCached(key1), Optional.empty());
+ assertEquals(objCache.get(key1).join(), Optional.of(value1));
+ }
+
+ @Test(dataProvider = "impl")
+ public void insertionOutsideCacheWithGenericType(String provider, String
url) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+ MetadataCache<Map<String, String>> objCache = store
+ .getMetadataCache(new TypeReference<Map<String, String>>() {
+ });
+
+ String key1 = newKey();
+
+ Map<String, String> v = new TreeMap<>();
+ v.put("a", "1");
+ v.put("b", "2");
+ store.put(key1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(v),
Optional.of(-1L)).join();
+
+ assertEquals(objCache.getIfCached(key1), Optional.empty());
+ assertEquals(objCache.get(key1).join(), Optional.of(v));
+ }
+
+ @Test(dataProvider = "impl")
+ public void invalidJsonContent(String provider, String url) throws
Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ MetadataCache<MyClass> objCache =
store.getMetadataCache(MyClass.class);
+
+ String key1 = newKey();
+
+ store.put(key1, "-------".getBytes(), Optional.of(-1L)).join();
+
+ try {
+ objCache.get(key1).join();
+ fail("should have failed to deserialize");
+ } catch (CompletionException e) {
+ assertEquals(e.getCause().getClass(),
ContentDeserializationException.class);
+ }
+ assertEquals(objCache.getIfCached(key1), Optional.empty());
+ }
+
+ @Test(dataProvider = "impl")
+ public void readModifyUpdate(String provider, String url) throws Exception
{
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ MetadataCache<MyClass> objCache =
store.getMetadataCache(MyClass.class);
+
+ String key1 = newKey();
+
+ MyClass value1 = new MyClass("a", 1);
+ objCache.create(key1, value1).join();
+
+ objCache.readModifyUpdate(key1, v -> {
+ return new MyClass(v.a, v.b + 1);
+ }).join();
+
+ Optional<MyClass> newValue1 = objCache.get(key1).join();
+ assertTrue(newValue1.isPresent());
+ assertEquals(newValue1.get().a, "a");
+ assertEquals(newValue1.get().b, 2);
+
+ // Should fail if the key does not exist
+ try {
+ objCache.readModifyUpdate(newKey(), v -> {
+ return new MyClass(v.a, v.b + 1);
+ }).join();
+ } catch (CompletionException e) {
+ assertEquals(e.getCause().getClass(), NotFoundException.class);
+ }
+ }
+
+ @Test(dataProvider = "impl")
+ public void readModifyUpdateOrCreate(String provider, String url) throws
Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ MetadataCache<MyClass> objCache =
store.getMetadataCache(MyClass.class);
+
+ String key1 = newKey();
+
+ objCache.readModifyUpdateOrCreate(key1, optValue -> {
+ if (optValue.isPresent()) {
+ return new MyClass(optValue.get().a, optValue.get().b + 1);
+ } else {
+ return new MyClass("a", 1);
+ }
+ }).join();
+
+ Optional<MyClass> newValue1 = objCache.get(key1).join();
+ assertTrue(newValue1.isPresent());
+ assertEquals(newValue1.get().a, "a");
+ assertEquals(newValue1.get().b, 1);
+
+ objCache.readModifyUpdateOrCreate(key1, optValue -> {
+ assertTrue(optValue.isPresent());
+ return new MyClass(optValue.get().a, optValue.get().b + 1);
+ }).join();
+
+ newValue1 = objCache.get(key1).join();
+ assertTrue(newValue1.isPresent());
+ assertEquals(newValue1.get().a, "a");
+ assertEquals(newValue1.get().b, 2);
+ }
+
+}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 70f8ea3..1a8be1e 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -27,7 +28,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -38,31 +42,11 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
import org.testng.annotations.Test;
-public class MetadataStoreTest {
-
- private TestZKServer zks;
-
- @BeforeClass
- void setup() throws Exception {
- zks = new TestZKServer();
- }
-
- @AfterClass
- void teardown() throws Exception {
- zks.close();
- }
-
- @DataProvider(name = "impl")
- public Object[][] implementations() {
- return new Object[][] {
- { "ZooKeeper", zks.getConnectionString() },
- };
- }
+public class MetadataStoreTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void emptyStoreTest(String provider, String url) throws Exception {
@@ -81,14 +65,14 @@ public class MetadataStoreTest {
store.delete("/non-existing-key", Optional.empty()).join();
fail("Should have failed");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(), NotFoundException.class);
+ assertException(e, NotFoundException.class);
}
try {
store.delete("/non-existing-key", Optional.of(1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(), NotFoundException.class);
+ assertException(e, NotFoundException.class);
}
}
@@ -103,14 +87,14 @@ public class MetadataStoreTest {
store.put(key1, "value-1".getBytes(), Optional.of(0L)).join();
fail("Should have failed");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(), BadVersionException.class);
+ assertException(e, BadVersionException.class);
}
try {
store.put(key1, "value-1".getBytes(), Optional.of(1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(), BadVersionException.class);
+ assertException(e, BadVersionException.class);
}
store.put(key1, "value-1".getBytes(), Optional.of(-1L)).join();
@@ -125,16 +109,22 @@ public class MetadataStoreTest {
store.put(key1, "value-2".getBytes(), Optional.of(-1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(), BadVersionException.class);
+ assertException(e, BadVersionException.class);
}
try {
store.put(key1, "value-2".getBytes(), Optional.of(1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(), BadVersionException.class);
+ assertException(e, BadVersionException.class);
}
+ assertTrue(store.exists(key1).join());
+ optRes = store.get(key1).join();
+ assertTrue(optRes.isPresent());
+ assertEquals(optRes.get().getValue(), "value-1".getBytes());
+ assertEquals(optRes.get().getStat().getVersion(), 0);
+
store.put(key1, "value-2".getBytes(), Optional.of(0L)).join();
assertTrue(store.exists(key1).join());
@@ -188,7 +178,7 @@ public class MetadataStoreTest {
store.delete(key, Optional.empty()).join();
fail("The key has children");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(),
MetadataStoreException.class);
+ assertException(e, MetadataStoreException.class);
}
for (int i = 0; i < N; i++) {
@@ -196,7 +186,7 @@ public class MetadataStoreTest {
store.delete(key + "/c-" + i, Optional.of(1L)).join();
fail("The key has children");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(),
BadVersionException.class);
+ assertException(e, BadVersionException.class);
}
store.delete(key + "/c-" + i, Optional.empty()).join();
@@ -212,39 +202,98 @@ public class MetadataStoreTest {
store.delete("", Optional.empty()).join();
fail("The key cannot be empty");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(),
MetadataStoreException.class);
+ assertException(e, MetadataStoreException.class);
}
try {
store.getChildren("").join();
fail("The key cannot be empty");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(),
MetadataStoreException.class);
+ assertException(e, MetadataStoreException.class);
}
try {
store.get("").join();
fail("The key cannot be empty");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(),
MetadataStoreException.class);
+ assertException(e, MetadataStoreException.class);
}
try {
store.exists("").join();
fail("The key cannot be empty");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(),
MetadataStoreException.class);
+ assertException(e, MetadataStoreException.class);
}
try {
store.put("", new byte[0], Optional.empty()).join();
fail("The key cannot be empty");
} catch (CompletionException e) {
- assertEquals(e.getCause().getClass(),
MetadataStoreException.class);
+ assertException(e, MetadataStoreException.class);
}
}
- private static String newKey() {
- return "/key-" + System.nanoTime();
+ @Test(dataProvider = "impl")
+ public void notificationListeners(String provider, String url) throws
Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(url,
MetadataStoreConfig.builder().build());
+
+ BlockingQueue<Notification> notifications = new
LinkedBlockingDeque<>();
+ store.registerListener(n -> {
+ notifications.add(n);
+ });
+
+ String key1 = newKey();
+
+ assertFalse(store.get(key1).join().isPresent());
+
+ // Trigger created notification
+ store.put(key1, "value-1".getBytes(), Optional.empty()).join();
+ assertTrue(store.get(key1).join().isPresent());
+ assertEquals(store.getChildren(key1).join(), Collections.emptyList());
+
+ Notification n = notifications.poll(3, TimeUnit.SECONDS);
+ assertNotNull(n);
+ assertEquals(n.getType(), NotificationType.Created);
+ assertEquals(n.getPath(), key1);
+
+ // Trigger modified notification
+ store.put(key1, "value-2".getBytes(), Optional.empty()).join();
+ n = notifications.poll(3, TimeUnit.SECONDS);
+ assertNotNull(n);
+ assertEquals(n.getType(), NotificationType.Modified);
+ assertEquals(n.getPath(), key1);
+
+ // Trigger modified notification on the parent
+ String key1Child = key1 + "/xx";
+
+ assertFalse(store.get(key1Child).join().isPresent());
+
+ store.put(key1Child, "value-2".getBytes(), Optional.empty()).join();
+ n = notifications.poll(3, TimeUnit.SECONDS);
+ assertNotNull(n);
+ assertEquals(n.getType(), NotificationType.Created);
+ assertEquals(n.getPath(), key1Child);
+
+ n = notifications.poll(3, TimeUnit.SECONDS);
+ assertNotNull(n);
+ assertEquals(n.getType(), NotificationType.ChildrenChanged);
+ assertEquals(n.getPath(), key1);
+
+ assertTrue(store.exists(key1Child).join());
+ assertEquals(store.getChildren(key1).join(),
Collections.singletonList("xx"));
+
+ store.delete(key1Child, Optional.empty()).join();
+ n = notifications.poll(3, TimeUnit.SECONDS);
+ assertNotNull(n);
+ assertEquals(n.getType(), NotificationType.Deleted);
+ assertEquals(n.getPath(), key1Child);
+
+ // Parent should be notified of the deletion
+ n = notifications.poll(3, TimeUnit.SECONDS);
+ assertNotNull(n);
+ assertEquals(n.getType(), NotificationType.ChildrenChanged);
+ assertEquals(n.getPath(), key1);
}
}