This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ac0e15d  PIP-45: Add MetadataCache implementation (#9148)
ac0e15d is described below

commit ac0e15d24e2a40a20d6889bf477ae9bd52bb37eb
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jan 8 16:57:21 2021 -0800

    PIP-45: Add MetadataCache implementation (#9148)
    
    ### Motivation
    
    Third part of implementation for PIP-45:
    
     * Added listener that can be added to the store
     * Defined `MetadataCache` that acts as an object cache on top of the data 
store
     * Added `LocalMemoryMetadataStore` as a reference implementation that can 
be used in tests.
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |   2 +-
 .../bookkeeper/mledger/impl/MetaStoreImplTest.java |   2 +-
 pulsar-metadata/pom.xml                            |   6 +
 .../apache/pulsar/metadata/api/MetadataStore.java  |  33 ++-
 .../metadata/api/MetadataStoreException.java       |  75 +++++-
 .../pulsar/metadata/api/MetadataStoreFactory.java  |   9 +-
 ...tadataStoreException.java => Notification.java} |  29 +--
 ...taStoreException.java => NotificationType.java} |  33 +--
 .../pulsar/metadata/cache/MetadataCache.java       | 133 +++++++++++
 .../impl/JSONMetadataSerdeSimpleType.java}         |  37 ++-
 .../impl/JSONMetadataSerdeTypeRef.java}            |  37 ++-
 .../metadata/cache/impl/MetadataCacheImpl.java     | 246 +++++++++++++++++++
 .../impl/MetadataSerde.java}                       |  29 +--
 .../metadata/impl/AbstractMetadataStore.java       | 162 +++++++++++++
 .../metadata/impl/LocalMemoryMetadataStore.java    | 192 +++++++++++++++
 .../impl/{zookeeper => }/ZKMetadataStore.java      |  94 ++++++--
 .../pulsar/metadata/BaseMetadataStoreTest.java     |  61 +++++
 .../apache/pulsar/metadata/MetadataCacheTest.java  | 261 +++++++++++++++++++++
 .../apache/pulsar/metadata/MetadataStoreTest.java  | 125 +++++++---
 20 files changed, 1384 insertions(+), 184 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 7efb471..ab28046 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -82,7 +82,7 @@ import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 4406df6..18bd70c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -109,7 +109,7 @@ import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
index e41c550..98a0433 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
@@ -32,7 +32,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.metadata.api.Stat;
-import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index fbcd511..dd9c668 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -34,6 +34,12 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
     </dependency>
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 16b0496..3cafa17 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -18,14 +18,17 @@
  */
 package org.apache.pulsar.metadata.api;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.annotations.Beta;
 
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.cache.MetadataCache;
 
 /**
  * Metadata store client interface.
@@ -56,7 +59,7 @@ public interface MetadataStore extends AutoCloseable {
      * If the path itself does not exist, it will return an empty list.
      *
      * @param path
-     *            webSocketProxyEnabled
+     *            the path of the key to get from the store
      * @return a future to track the async request
      */
     CompletableFuture<List<String>> getChildren(String path);
@@ -109,4 +112,32 @@ public interface MetadataStore extends AutoCloseable {
      * @return a future to track the async request
      */
     CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion);
+
+    /**
+     * Register a listener that will be called on changes in the underlying 
store.
+     *
+     * @param listener
+     *            a consumer of notifications
+     */
+    void registerListener(Consumer<Notification> listener);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param clazz
+     *            the class type to be used for serialization/deserialization
+     * @return the metadata cache object
+     */
+    <T> MetadataCache<T> getMetadataCache(Class<T> clazz);
+
+    /**
+     * Create a metadata cache specialized for a specific class.
+     *
+     * @param <T>
+     * @param typeRef
+     *            the type ref description to be used for 
serialization/deserialization
+     * @return the metadata cache object
+     */
+    <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef);
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
index ab981df..c5fdb27 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.metadata.api;
 
 import java.io.IOException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Generic metadata store exception.
@@ -29,13 +31,38 @@ public class MetadataStoreException extends IOException {
         super(t);
     }
 
+    public MetadataStoreException(String msg) {
+        super(msg);
+    }
+
     /**
-     * Value not found in store.
+     * Key not found in store.
      */
     public static class NotFoundException extends MetadataStoreException {
+        public NotFoundException() {
+            super((Throwable)null);
+        }
+
         public NotFoundException(Throwable t) {
             super(t);
         }
+
+        public NotFoundException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
+     * Key was already in store.
+     */
+    public static class AlreadyExistsException extends MetadataStoreException {
+        public AlreadyExistsException(Throwable t) {
+            super(t);
+        }
+
+        public AlreadyExistsException(String msg) {
+            super(msg);
+        }
     }
 
     /**
@@ -45,5 +72,51 @@ public class MetadataStoreException extends IOException {
         public BadVersionException(Throwable t) {
             super(t);
         }
+
+        public BadVersionException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
+     * Failed to de-serialize the metadata.
+     */
+    public static class ContentDeserializationException extends 
MetadataStoreException {
+        public ContentDeserializationException(Throwable t) {
+            super(t);
+        }
+
+        public ContentDeserializationException(String msg) {
+            super(msg);
+        }
+    }
+
+    public static MetadataStoreException unwrap(Throwable t) {
+        if (t instanceof MetadataStoreException) {
+            return (MetadataStoreException) t;
+        } else if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+        } else if (t instanceof InterruptedException) {
+            return new MetadataStoreException(t);
+        } else if (!(t instanceof ExecutionException) && !(t instanceof 
CompletionException)) {
+            // Generic exception
+            return new MetadataStoreException(t);
+        }
+
+        // Unwrap the exception to keep the same exception type but a stack 
trace that includes the application calling
+        // site
+        Throwable cause = t.getCause();
+        String msg = cause.getMessage();
+        if (cause instanceof NotFoundException) {
+            return new NotFoundException(msg);
+        } else if (cause instanceof AlreadyExistsException) {
+            return new AlreadyExistsException(msg);
+        } else if (cause instanceof BadVersionException) {
+            return new BadVersionException(msg);
+        } else if (cause instanceof ContentDeserializationException) {
+            return new ContentDeserializationException(msg);
+        } else {
+            return new MetadataStoreException(t);
+        }
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
index d44b666..efd238e 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreFactory.java
@@ -22,7 +22,8 @@ import java.io.IOException;
 
 import lombok.experimental.UtilityClass;
 
-import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 
 /**
  * Factory class for {@link MetadataStore}.
@@ -41,6 +42,10 @@ public class MetadataStoreFactory {
      *             if the metadata store initialization fails
      */
     public static MetadataStore create(String metadataURL, MetadataStoreConfig 
metadataStoreConfig) throws IOException {
-        return new ZKMetadataStore(metadataURL, metadataStoreConfig);
+        if (metadataURL.startsWith("memory://")) {
+            return new LocalMemoryMetadataStore(metadataURL, 
metadataStoreConfig);
+        } else {
+            return new ZKMetadataStore(metadataURL, metadataStoreConfig);
+        }
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Notification.java
similarity index 58%
copy from 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Notification.java
index ab981df..d27bfd4 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Notification.java
@@ -18,32 +18,17 @@
  */
 package org.apache.pulsar.metadata.api;
 
-import java.io.IOException;
-
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
-
-    public MetadataStoreException(Throwable t) {
-        super(t);
-    }
+import lombok.Data;
 
+@Data
+public final class Notification {
     /**
-     * Value not found in store.
+     * The type of the event being notified.
      */
-    public static class NotFoundException extends MetadataStoreException {
-        public NotFoundException(Throwable t) {
-            super(t);
-        }
-    }
+    private final NotificationType type;
 
     /**
-     * Unsuccessful update due to mismatched expected version.
+     * Path of the kev/value pair interested by the notification
      */
-    public static class BadVersionException extends MetadataStoreException {
-        public BadVersionException(Throwable t) {
-            super(t);
-        }
-    }
+    private final String path;
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/NotificationType.java
similarity index 56%
copy from 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/NotificationType.java
index ab981df..bbc1dfd 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/NotificationType.java
@@ -18,32 +18,9 @@
  */
 package org.apache.pulsar.metadata.api;
 
-import java.io.IOException;
-
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
-
-    public MetadataStoreException(Throwable t) {
-        super(t);
-    }
-
-    /**
-     * Value not found in store.
-     */
-    public static class NotFoundException extends MetadataStoreException {
-        public NotFoundException(Throwable t) {
-            super(t);
-        }
-    }
-
-    /**
-     * Unsuccessful update due to mismatched expected version.
-     */
-    public static class BadVersionException extends MetadataStoreException {
-        public BadVersionException(Throwable t) {
-            super(t);
-        }
-    }
+public enum NotificationType {
+    Created,
+    Modified,
+    ChildrenChanged,
+    Deleted
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/MetadataCache.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/MetadataCache.java
new file mode 100644
index 0000000..17aa6b3
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/MetadataCache.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.cache;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+
+/**
+ * Represent the caching layer access for a specific type of objects.
+ */
+public interface MetadataCache<T> {
+
+    /**
+     * Tries to fetch one item from the cache or fallback to the store if not 
present.
+     * <p>
+     * If the key is not found, the {@link Optional} will be empty.
+     *
+     * @param path
+     *            the path of the object in the metadata store
+     * @return a future to track the completion of the operation
+     */
+    CompletableFuture<Optional<T>> get(String path);
+
+    /**
+     * Check if an object is present in cache without triggering a load from 
the metadata store.
+     *
+     * @param path
+     *            the path of the object in the metadata store
+     * @return the cached object or an empty {@link Optional} is the cache 
doesn't have the object
+     */
+    Optional<T> getIfCached(String path);
+
+    /**
+     * Return all the nodes (lexicographically sorted) that are children to 
the specific path.
+     *
+     * If the path itself does not exist, it will return an empty list.
+     *
+     * @param path
+     *            the path of the key to get from the store
+     * @return a future to track the async request
+     */
+    CompletableFuture<List<String>> getChildren(String path);
+
+    /**
+     * Read whether a specific path exists.
+     *
+     * Note: In case of keys with multiple levels (eg: '/a/b/c'), checking the 
existence of a parent (eg. '/a') might
+     * not necessarily return true, unless the key had been explicitly created.
+     *
+     * @param path
+     *            the path of the key to check on the store
+     * @return a future to track the async request
+     */
+    CompletableFuture<Boolean> exists(String path);
+
+    /**
+     * Perform an atomic read-modify-update of the value.
+     * <p>
+     * The modify function can potentially be called multiple times if there 
are concurrent updates happening.
+     * <p>
+     * If the object does not exist yet, the <code>modifyFunction</code> will 
get passed an {@link Optional#empty()}
+     * object.
+     *
+     * @param path
+     *            the path of the object in the metadata store
+     * @param modifyFunction
+     *            a function that will be passed the current value and returns 
a modified value to be stored
+     * @return a future to track the completion of the operation
+     */
+    CompletableFuture<Void> readModifyUpdateOrCreate(String path, 
Function<Optional<T>, T> modifyFunction);
+
+    /**
+     * Perform an atomic read-modify-update of the value.
+     * <p>
+     * The modify function can potentially be called multiple times if there 
are concurrent updates happening.
+     *
+     * @param path
+     *            the path of the object in the metadata store
+     * @param modifyFunction
+     *            a function that will be passed the current value and returns 
a modified value to be stored
+     * @return a future to track the completion of the operation
+     */
+    CompletableFuture<Void> readModifyUpdate(String path, Function<T, T> 
modifyFunction);
+
+    /**
+     * Create a new object in the metadata store.
+     * <p>
+     * This operation will make sure to keep the cache consistent.
+     *
+     * @param path
+     *            the path of the object in the metadata store
+     * @param value
+     *            the object to insert in metadata store
+     * @return a future to track the completion of the operation
+     * @throws AlreadyExistsException
+     *             If the object is already present.
+     */
+    CompletableFuture<Void> create(String path, T value);
+
+    /**
+     * Delete an object from the metadata store.
+     * <p>
+     * This operation will make sure to keep the cache consistent.
+     *
+     * @param path
+     *            the path of the object in the metadata store
+     * @return a future to track the completion of the operation
+     * @throws NotFoundException
+     *             if the object is not present in the metadata store.
+     */
+    CompletableFuture<Void> delete(String path);
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
similarity index 56%
copy from 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
index ab981df..d3f071b 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeSimpleType.java
@@ -16,34 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.metadata.api;
+package org.apache.pulsar.metadata.cache.impl;
+
+import com.fasterxml.jackson.databind.JavaType;
 
 import java.io.IOException;
 
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+public class JSONMetadataSerdeSimpleType<T> implements MetadataSerde<T> {
+
+    private final JavaType typeRef;
 
-    public MetadataStoreException(Throwable t) {
-        super(t);
+    public JSONMetadataSerdeSimpleType(JavaType typeRef) {
+        this.typeRef = typeRef;
     }
 
-    /**
-     * Value not found in store.
-     */
-    public static class NotFoundException extends MetadataStoreException {
-        public NotFoundException(Throwable t) {
-            super(t);
-        }
+    @Override
+    public byte[] serialize(T value) throws IOException {
+        return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value);
     }
 
-    /**
-     * Unsuccessful update due to mismatched expected version.
-     */
-    public static class BadVersionException extends MetadataStoreException {
-        public BadVersionException(Throwable t) {
-            super(t);
-        }
+    @Override
+    public T deserialize(byte[] content) throws IOException {
+        return ObjectMapperFactory.getThreadLocal().readValue(content, 
typeRef);
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeTypeRef.java
similarity index 55%
copy from 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeTypeRef.java
index ab981df..4b37898 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/JSONMetadataSerdeTypeRef.java
@@ -16,34 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.metadata.api;
+package org.apache.pulsar.metadata.cache.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
 
 import java.io.IOException;
 
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+
+public class JSONMetadataSerdeTypeRef<T> implements MetadataSerde<T> {
+
+    private final TypeReference<T> typeRef;
 
-    public MetadataStoreException(Throwable t) {
-        super(t);
+    public JSONMetadataSerdeTypeRef(TypeReference<T> typeRef) {
+        this.typeRef = typeRef;
     }
 
-    /**
-     * Value not found in store.
-     */
-    public static class NotFoundException extends MetadataStoreException {
-        public NotFoundException(Throwable t) {
-            super(t);
-        }
+    @Override
+    public byte[] serialize(T value) throws IOException {
+        return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value);
     }
 
-    /**
-     * Unsuccessful update due to mismatched expected version.
-     */
-    public static class BadVersionException extends MetadataStoreException {
-        public BadVersionException(Throwable t) {
-            super(t);
-        }
+    @Override
+    public T deserialize(byte[] content) throws IOException {
+        return ObjectMapperFactory.getThreadLocal().readValue(content, 
typeRef);
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
new file mode 100644
index 0000000..958036a
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.cache.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.cache.MetadataCache;
+
+public class MetadataCacheImpl<T> implements MetadataCache<T>, 
Consumer<Notification> {
+
+    private static final long CACHE_REFRESH_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+
+    private final MetadataStore store;
+    private final MetadataSerde<T> serde;
+
+    private final AsyncLoadingCache<String, Optional<Entry<T, Stat>>> objCache;
+
+    public MetadataCacheImpl(MetadataStore store, TypeReference<T> typeRef) {
+        this(store, new JSONMetadataSerdeTypeRef<>(typeRef));
+    }
+
+    public MetadataCacheImpl(MetadataStore store, JavaType type) {
+        this(store, new JSONMetadataSerdeSimpleType<>(type));
+    }
+
+    private MetadataCacheImpl(MetadataStore store, MetadataSerde<T> serde) {
+        this.store = store;
+        this.serde = serde;
+
+        this.objCache = Caffeine.newBuilder()
+                .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, 
TimeUnit.MILLISECONDS)
+                .buildAsync(new AsyncCacheLoader<String, Optional<Entry<T, 
Stat>>>() {
+                    @Override
+                    public CompletableFuture<Optional<Entry<T, Stat>>> 
asyncLoad(String key, Executor executor) {
+                        return readValueFromStore(key);
+                    }
+
+                    @Override
+                    public CompletableFuture<Optional<Entry<T, Stat>>> 
asyncReload(String key,
+                            Optional<Entry<T, Stat>> oldValue, Executor 
executor) {
+                        return readValueFromStore(key);
+                    }
+                });
+    }
+
+    private CompletableFuture<Optional<Entry<T, Stat>>> 
readValueFromStore(String path) {
+        return store.get(path)
+                .thenCompose(optRes -> {
+                    if (!optRes.isPresent()) {
+                        return FutureUtils.value(Optional.empty());
+                    }
+
+                    try {
+                        T obj = serde.deserialize(optRes.get().getValue());
+                        return FutureUtils
+                                .value(Optional.of(new SimpleImmutableEntry<T, 
Stat>(obj, optRes.get().getStat())));
+                    } catch (Throwable t) {
+                        return FutureUtils.exception(new 
ContentDeserializationException(t));
+                    }
+                });
+    }
+
+    @Override
+    public CompletableFuture<Optional<T>> get(String path) {
+        return objCache.get(path)
+                .thenApply(optRes -> optRes.map(Entry::getKey));
+    }
+
+    @Override
+    public Optional<T> getIfCached(String path) {
+        CompletableFuture<Optional<Map.Entry<T, Stat>>> future = 
objCache.getIfPresent(path);
+        if (future != null && future.isDone() && 
!future.isCompletedExceptionally()) {
+            return future.join().map(Entry::getKey);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> readModifyUpdateOrCreate(String path, 
Function<Optional<T>, T> modifyFunction) {
+        return objCache.get(path)
+                .thenCompose(optEntry -> {
+                    Optional<T> currentValue;
+                    long expectedVersion;
+
+                    if (optEntry.isPresent()) {
+                        currentValue = Optional.of(optEntry.get().getKey());
+                        expectedVersion = 
optEntry.get().getValue().getVersion();
+                    } else {
+                        currentValue = Optional.empty();
+                        expectedVersion = -1;
+                    }
+
+                    T newValueObj;
+                    byte[] newValue;
+                    try {
+                        newValueObj = modifyFunction.apply(currentValue);
+                        newValue = serde.serialize(newValueObj);
+                    } catch (Throwable t) {
+                        return FutureUtils.exception(t);
+                    }
+
+                    return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(stat -> {
+                        // Make sure we have the value cached before the 
operation is completed
+                        objCache.put(path,
+                                FutureUtils.value(Optional.of(new 
SimpleImmutableEntry<T, Stat>(newValueObj, stat))));
+                    });
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> readModifyUpdate(String path, Function<T, 
T> modifyFunction) {
+        return objCache.get(path)
+                .thenCompose(optEntry -> {
+                    if (!optEntry.isPresent()) {
+                        return FutureUtils.exception(new 
NotFoundException(""));
+                    }
+
+                    Map.Entry<T, Stat> entry = optEntry.get();
+                    T currentValue = entry.getKey();
+                    long expectedVersion = 
optEntry.get().getValue().getVersion();
+
+                    T newValueObj;
+                    byte[] newValue;
+                    try {
+                        newValueObj = modifyFunction.apply(currentValue);
+                        newValue = serde.serialize(newValueObj);
+                    } catch (Throwable t) {
+                        return FutureUtils.exception(t);
+                    }
+
+                    return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(stat -> {
+                        // Make sure we have the value cached before the 
operation is completed
+                        objCache.put(path,
+                                FutureUtils.value(Optional.of(new 
SimpleImmutableEntry<T, Stat>(newValueObj, stat))));
+                    });
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> create(String path, T value) {
+        byte[] content;
+        try {
+            content = serde.serialize(value);
+        } catch (Throwable t) {
+            return FutureUtils.exception(t);
+        }
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        store.put(path, content, Optional.of(-1L))
+                .thenAccept(stat -> {
+                    // Make sure we have the value cached before the operation 
is completed
+                    objCache.put(path, FutureUtils.value(Optional.of(new 
SimpleImmutableEntry<T, Stat>(value, stat))));
+                    future.complete(null);
+                }).exceptionally(ex -> {
+                    if (ex.getCause() instanceof BadVersionException) {
+                        // Use already exists exception to provide more 
self-explanatory error message
+                        future.completeExceptionally(new 
AlreadyExistsException(ex.getCause()));
+                    } else {
+                        future.completeExceptionally(ex.getCause());
+                    }
+                    return null;
+                });
+
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> delete(String path) {
+        return store.delete(path, Optional.empty())
+                .thenAccept(v -> {
+                    // Mark in the cache that the object was removed
+                    objCache.put(path, FutureUtils.value(Optional.empty()));
+                });
+    }
+
+    @Override
+    public CompletableFuture<Boolean> exists(String path) {
+        return store.exists(path);
+    }
+
+    @Override
+    public CompletableFuture<List<String>> getChildren(String path) {
+        return store.getChildren(path);
+    }
+
+    @Override
+    public void accept(Notification t) {
+        String path = t.getPath();
+        switch (t.getType()) {
+        case Created:
+        case Modified:
+            if (objCache.synchronous().getIfPresent(path) != null) {
+                // Trigger background refresh of the cached item
+                objCache.synchronous().refresh(path);
+            }
+            break;
+
+        case Deleted:
+            objCache.synchronous().invalidate(path);
+            break;
+
+        default:
+            break;
+        }
+    }
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataSerde.java
similarity index 55%
copy from 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
copy to 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataSerde.java
index ab981df..a1a3b03 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreException.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataSerde.java
@@ -16,34 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.metadata.api;
+package org.apache.pulsar.metadata.cache.impl;
 
 import java.io.IOException;
 
-/**
- * Generic metadata store exception.
- */
-public class MetadataStoreException extends IOException {
-
-    public MetadataStoreException(Throwable t) {
-        super(t);
-    }
+public interface MetadataSerde<T> {
 
-    /**
-     * Value not found in store.
-     */
-    public static class NotFoundException extends MetadataStoreException {
-        public NotFoundException(Throwable t) {
-            super(t);
-        }
-    }
+    byte[] serialize(T value) throws IOException;
 
-    /**
-     * Unsuccessful update due to mismatched expected version.
-     */
-    public static class BadVersionException extends MetadataStoreException {
-        public BadVersionException(Throwable t) {
-            super(t);
-        }
-    }
+    T deserialize(byte[] content) throws IOException;
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
new file mode 100644
index 0000000..f6979d5
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.cache.MetadataCache;
+import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
+
+@Slf4j
+public abstract class AbstractMetadataStore implements MetadataStore, 
Consumer<Notification> {
+
+    private static final long CACHE_REFRESH_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+
+    private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new 
CopyOnWriteArrayList<>();
+    protected final ExecutorService executor;
+    private final AsyncLoadingCache<String, List<String>> childrenCache;
+    private final AsyncLoadingCache<String, Boolean> existsCache;
+    private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = 
new CopyOnWriteArrayList<>();
+
+    protected abstract CompletableFuture<List<String>> 
getChildrenFromStore(String path);
+
+    protected abstract CompletableFuture<Boolean> existsFromStore(String path);
+
+    protected AbstractMetadataStore() {
+        this.executor = Executors
+                .newSingleThreadExecutor(new 
DefaultThreadFactory("metadata-store"));
+        registerListener(this);
+
+        this.childrenCache = Caffeine.newBuilder()
+                .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, 
TimeUnit.MILLISECONDS)
+                .buildAsync(new AsyncCacheLoader<String, List<String>>() {
+                    @Override
+                    public CompletableFuture<List<String>> asyncLoad(String 
key, Executor executor) {
+                        return getChildrenFromStore(key);
+                    }
+
+                    @Override
+                    public CompletableFuture<List<String>> asyncReload(String 
key, List<String> oldValue,
+                            Executor executor) {
+                        return getChildrenFromStore(key);
+                    }
+                });
+
+        this.existsCache = Caffeine.newBuilder()
+                .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, 
TimeUnit.MILLISECONDS)
+                .buildAsync(new AsyncCacheLoader<String, Boolean>() {
+                    @Override
+                    public CompletableFuture<Boolean> asyncLoad(String key, 
Executor executor) {
+                        return existsFromStore(key);
+                    }
+
+                    @Override
+                    public CompletableFuture<Boolean> asyncReload(String key, 
Boolean oldValue,
+                            Executor executor) {
+                        return existsFromStore(key);
+                    }
+                });
+    }
+
+    @Override
+    public <T> MetadataCache<T> getMetadataCache(Class<T> clazz) {
+        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this,
+                TypeFactory.defaultInstance().constructSimpleType(clazz, 
null));
+        metadataCaches.add(metadataCache);
+        return metadataCache;
+    }
+
+    @Override
+    public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef) {
+        MetadataCacheImpl<T> metadataCache = new MetadataCacheImpl<T>(this, 
typeRef);
+        metadataCaches.add(metadataCache);
+        return metadataCache;
+    }
+
+    @Override
+    public final CompletableFuture<List<String>> getChildren(String path) {
+        return childrenCache.get(path);
+    }
+
+    @Override
+    public final CompletableFuture<Boolean> exists(String path) {
+        return existsCache.get(path);
+    }
+
+    @Override
+    public void registerListener(Consumer<Notification> listener) {
+        listeners.add(listener);
+    }
+
+    protected void receivedNotification(Notification notification) {
+        executor.execute(() -> {
+            listeners.forEach(listener -> {
+                try {
+                    listener.accept(notification);
+                } catch (Throwable t) {
+                    log.error("Failed to process metadata store notification", 
t);
+                }
+            });
+        });
+    }
+
+    @Override
+    public void accept(Notification n) {
+        String path = n.getPath();
+        NotificationType type = n.getType();
+
+        if (type == NotificationType.Created || type == 
NotificationType.Deleted) {
+            existsCache.synchronous().invalidate(path);
+        }
+
+        if (type == NotificationType.ChildrenChanged) {
+            childrenCache.synchronous().invalidate(path);
+        }
+
+        if (type == NotificationType.Created || type == 
NotificationType.Deleted || type == NotificationType.Modified) {
+            metadataCaches.forEach(c -> c.accept(n));
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        executor.shutdownNow();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+    }
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
new file mode 100644
index 0000000..83fdb31
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+import lombok.Data;
+
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+
+public class LocalMemoryMetadataStore extends AbstractMetadataStore implements 
MetadataStore {
+
+    @Data
+    private static class Value {
+        final long version;
+        final byte[] data;
+        final long createdTimestamp;
+        final long modifiedTimestamp;
+    }
+
+    private final NavigableMap<String, Value> map;
+
+    public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig) throws IOException {
+        map = new TreeMap<>();
+    }
+
+    @Override
+    public synchronized CompletableFuture<Optional<GetResult>> get(String 
path) {
+        if (!isValidPath(path)) {
+            return FutureUtils.exception(new MetadataStoreException(""));
+        }
+
+        Value v = map.get(path);
+        if (v != null) {
+            return FutureUtils.value(
+                    Optional.of(new GetResult(v.data, new Stat(v.version, 
v.createdTimestamp, v.modifiedTimestamp))));
+        } else {
+            return FutureUtils.value(Optional.empty());
+        }
+    }
+
+    @Override
+    public synchronized CompletableFuture<List<String>> 
getChildrenFromStore(String path) {
+        if (!isValidPath(path)) {
+            return FutureUtils.exception(new MetadataStoreException(""));
+        }
+
+        String firstKey = path.equals("/") ? path : path + "/";
+        String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is 
lexicographically just after '/'
+
+        List<String> children = new ArrayList<>();
+        map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
+            String relativePath = key.replace(firstKey, "");
+            if (!relativePath.contains("/")) {
+                // Only return first-level children
+                children.add(relativePath);
+            }
+        });
+
+        return FutureUtils.value(children);
+    }
+
+    @Override
+    public synchronized CompletableFuture<Boolean> existsFromStore(String 
path) {
+        if (!isValidPath(path)) {
+            return FutureUtils.exception(new MetadataStoreException(""));
+        }
+
+        Value v = map.get(path);
+        return FutureUtils.value(v != null ? true : false);
+    }
+
+    @Override
+    public synchronized CompletableFuture<Stat> put(String path, byte[] data, 
Optional<Long> optExpectedVersion) {
+        if (!isValidPath(path)) {
+            return FutureUtils.exception(new MetadataStoreException(""));
+        }
+
+        boolean hasVersion = optExpectedVersion.isPresent();
+        int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
+
+        long now = System.currentTimeMillis();
+
+        if (hasVersion && expectedVersion == -1) {
+            Value newValue = new Value(0, data, now, now);
+            Value existingValue = map.putIfAbsent(path, newValue);
+            if (existingValue != null) {
+                return FutureUtils.exception(new BadVersionException(""));
+            } else {
+                receivedNotification(new 
Notification(NotificationType.Created, path));
+                String parent = parent(path);
+                if (parent != null) {
+                    receivedNotification(new 
Notification(NotificationType.ChildrenChanged, parent));
+                }
+                return FutureUtils.value(new Stat(0, now, now));
+            }
+        } else {
+            Value existingValue = map.get(path);
+            long existingVersion = existingValue != null ? 
existingValue.version : -1;
+            if (hasVersion && expectedVersion != existingVersion) {
+                return FutureUtils.exception(new BadVersionException(""));
+            } else {
+                long newVersion = existingValue != null ? 
existingValue.version + 1 : 0;
+                long createdTimestamp = existingValue != null ? 
existingValue.createdTimestamp : now;
+                Value newValue = new Value(newVersion, data, createdTimestamp, 
now);
+                map.put(path, newValue);
+
+                NotificationType type = existingValue == null ? 
NotificationType.Created : NotificationType.Modified;
+                receivedNotification(new Notification(type, path));
+                if (type == NotificationType.Created) {
+                    String parent = parent(path);
+                    if (parent != null) {
+                        receivedNotification(new 
Notification(NotificationType.ChildrenChanged, parent));
+                    }
+                }
+                return FutureUtils
+                        .value(new Stat(newValue.version, 
newValue.createdTimestamp, newValue.modifiedTimestamp));
+            }
+        }
+    }
+
+    @Override
+    public synchronized CompletableFuture<Void> delete(String path, 
Optional<Long> optExpectedVersion) {
+        if (!isValidPath(path)) {
+            return FutureUtils.exception(new MetadataStoreException(""));
+        }
+
+        Value value = map.get(path);
+        if (value == null) {
+            return FutureUtils.exception(new NotFoundException(""));
+        } else if (value != null && optExpectedVersion.isPresent() && 
optExpectedVersion.get() != value.version) {
+            return FutureUtils.exception(new BadVersionException(""));
+        } else {
+            map.remove(path);
+            receivedNotification(new Notification(NotificationType.Deleted, 
path));
+            String parent = parent(path);
+            if (parent != null) {
+                receivedNotification(new 
Notification(NotificationType.ChildrenChanged, parent));
+            }
+            return FutureUtils.value(null);
+        }
+    }
+
+    private static boolean isValidPath(String path) {
+        if (path == null || !path.startsWith("/")) {
+            return false;
+        }
+
+        return path != "/" || !path.endsWith("/");
+    }
+
+    private static String parent(String path) {
+        int idx = path.lastIndexOf('/');
+        if (idx <= 0) {
+            // No parent
+            return null;
+        }
+
+        return path.substring(0, idx);
+    }
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
similarity index 78%
rename from 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
rename to 
pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index e35b583..4b8135b 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/zookeeper/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -16,15 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.metadata.impl.zookeeper;
+package org.apache.pulsar.metadata.impl;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
@@ -35,22 +37,22 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
-public class ZKMetadataStore implements MetadataStore {
+@Slf4j
+public class ZKMetadataStore extends AbstractMetadataStore implements 
MetadataStore, Watcher {
 
     private final boolean isZkManaged;
     private final ZooKeeper zkc;
-    private final ExecutorService executor;
 
     public ZKMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig) throws IOException {
         try {
@@ -62,15 +64,12 @@ public class ZKMetadataStore implements MetadataStore {
         } catch (KeeperException | InterruptedException e) {
             throw new IOException(e);
         }
-
-        this.executor = Executors.newSingleThreadExecutor(new 
DefaultThreadFactory("zk-metadata-store-callback"));
     }
 
     @VisibleForTesting
     public ZKMetadataStore(ZooKeeper zkc) {
         this.isZkManaged = false;
         this.zkc = zkc;
-        this.executor = Executors.newSingleThreadExecutor(new 
DefaultThreadFactory("zk-metadata-store-callback"));
     }
 
     @Override
@@ -78,12 +77,29 @@ public class ZKMetadataStore implements MetadataStore {
         CompletableFuture<Optional<GetResult>> future = new 
CompletableFuture<>();
 
         try {
-            zkc.getData(path, null, (rc, path1, ctx, data, stat) -> {
+            zkc.getData(path, this, (rc, path1, ctx, data, stat) -> {
                 executor.execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(Optional.of(new GetResult(data, 
getStat(stat))));
                     } else if (code == Code.NONODE) {
+                        // Place a watch on the non-existing node, so we'll 
get notified
+                        // when it gets created and we can invalidate the 
negative cache.
+                        existsFromStore(path).thenAccept(exists -> {
+                            if (exists) {
+                                get(path).thenAccept(c -> future.complete(c))
+                                        .exceptionally(ex -> {
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        });
+                            } else {
+                                // Z-node does not exist
+                                future.complete(Optional.empty());
+                            }
+                        }).exceptionally(ex -> {
+                            future.completeExceptionally(ex);
+                            return null;
+                        });
                         future.complete(Optional.empty());
                     } else {
                         future.completeExceptionally(getException(code, path));
@@ -98,11 +114,11 @@ public class ZKMetadataStore implements MetadataStore {
     }
 
     @Override
-    public CompletableFuture<List<String>> getChildren(String path) {
+    public CompletableFuture<List<String>> getChildrenFromStore(String path) {
         CompletableFuture<List<String>> future = new CompletableFuture<>();
 
         try {
-            zkc.getChildren(path, null, (rc, path1, ctx, children) -> {
+            zkc.getChildren(path, this, (rc, path1, ctx, children) -> {
                 executor.execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
@@ -114,9 +130,9 @@ public class ZKMetadataStore implements MetadataStore {
                         // been created after the call to getChildren, but 
before the call to exists().
                         // If this is the case, exists will return true, and 
we just call getChildren
                         // again.
-                        exists(path).thenAccept(exists -> {
+                        existsFromStore(path).thenAccept(exists -> {
                             if (exists) {
-                                getChildren(path).thenAccept(c -> 
future.complete(c)).exceptionally(ex -> {
+                                getChildrenFromStore(path).thenAccept(c -> 
future.complete(c)).exceptionally(ex -> {
                                     future.completeExceptionally(ex);
                                     return null;
                                 });
@@ -128,8 +144,6 @@ public class ZKMetadataStore implements MetadataStore {
                             future.completeExceptionally(ex);
                             return null;
                         });
-
-                        future.complete(Collections.emptyList());
                     } else {
                         future.completeExceptionally(getException(code, path));
                     }
@@ -143,11 +157,11 @@ public class ZKMetadataStore implements MetadataStore {
     }
 
     @Override
-    public CompletableFuture<Boolean> exists(String path) {
+    public CompletableFuture<Boolean> existsFromStore(String path) {
         CompletableFuture<Boolean> future = new CompletableFuture<>();
 
         try {
-            zkc.exists(path, null, (rc, path1, ctx, stat) -> {
+            zkc.exists(path, this, (rc, path1, ctx, stat) -> {
                 executor.execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
@@ -250,7 +264,7 @@ public class ZKMetadataStore implements MetadataStore {
         if (isZkManaged) {
             zkc.close();
         }
-        executor.shutdownNow();
+        super.close();
     }
 
     private static Stat getStat(org.apache.zookeeper.data.Stat zkStat) {
@@ -269,4 +283,40 @@ public class ZKMetadataStore implements MetadataStore {
             return new MetadataStoreException(ex);
         }
     }
+
+    @Override
+    public void process(WatchedEvent event) {
+        if (log.isDebugEnabled()) {
+            log.debug("Received ZK watch : {}", event);
+        }
+        String path = event.getPath();
+        if (path == null) {
+            // Ignore Session events
+            return;
+        }
+
+        NotificationType type;
+        switch (event.getType()) {
+        case NodeCreated:
+            type = NotificationType.Created;
+            break;
+
+        case NodeDataChanged:
+            type = NotificationType.Modified;
+            break;
+
+        case NodeChildrenChanged:
+            type = NotificationType.ChildrenChanged;
+            break;
+
+        case NodeDeleted:
+            type = NotificationType.Deleted;
+            break;
+
+        default:
+            return;
+        }
+
+        receivedNotification(new Notification(type, event.getPath()));
+    }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
new file mode 100644
index 0000000..7319c20
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertTrue;
+
+import java.util.concurrent.CompletionException;
+
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+
+public abstract class BaseMetadataStoreTest {
+    protected TestZKServer zks;
+
+    @BeforeClass
+    void setup() throws Exception {
+        zks = new TestZKServer();
+    }
+
+    @AfterClass
+    void teardown() throws Exception {
+        zks.close();
+    }
+
+    @DataProvider(name = "impl")
+    public Object[][] implementations() {
+        return new Object[][] {
+                { "ZooKeeper", zks.getConnectionString() },
+                { "Memory", "memory://local" },
+        };
+    }
+
+    protected String newKey() {
+        return "/key-" + System.nanoTime();
+    }
+
+    static void assertException(CompletionException e, Class<?> clazz) {
+        assertException(e.getCause(), clazz);
+    }
+
+    static void assertException(Throwable t, Class<?> clazz) {
+        assertTrue(clazz.isInstance(t), String.format("Exception %s is not of 
type %s", t.getClass(), clazz));
+    }
+}
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
new file mode 100644
index 0000000..44de394
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletionException;
+
+import lombok.AllArgsConstructor;
+import lombok.Cleanup;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.cache.MetadataCache;
+import org.testng.annotations.Test;
+
+public class MetadataCacheTest extends BaseMetadataStoreTest {
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    static class MyClass {
+        String a;
+        int b;
+    }
+
+    @Test(dataProvider = "impl")
+    public void emptyCacheTest(String provider, String url) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        MetadataCache<MyClass> objCache = 
store.getMetadataCache(MyClass.class);
+
+        assertEquals(objCache.getIfCached("/non-existing-key"), 
Optional.empty());
+        assertEquals(objCache.getIfCached("/non-existing-key/child"), 
Optional.empty());
+
+        assertEquals(objCache.get("/non-existing-key").join(), 
Optional.empty());
+        assertEquals(objCache.get("/non-existing-key/child").join(), 
Optional.empty());
+
+        try {
+            objCache.delete("/non-existing-key").join();
+            fail("should have failed");
+        } catch (CompletionException e) {
+            assertEquals(e.getCause().getClass(), NotFoundException.class);
+        }
+
+        try {
+            objCache.delete("/non-existing-key/child").join();
+            fail("should have failed");
+        } catch (CompletionException e) {
+            assertEquals(e.getCause().getClass(), NotFoundException.class);
+        }
+    }
+
+    @Test(dataProvider = "impl")
+    public void insertionDeletionWitGenericType(String provider, String url) 
throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        MetadataCache<Map<String, String>> objCache = store
+                .getMetadataCache(new TypeReference<Map<String, String>>() {
+                });
+
+        String key1 = newKey();
+
+        assertEquals(objCache.getIfCached(key1), Optional.empty());
+
+        Map<String, String> v = new TreeMap<>();
+        v.put("a", "1");
+        v.put("b", "2");
+        objCache.create(key1, v).join();
+
+        assertEquals(objCache.getIfCached(key1), Optional.of(v));
+        assertEquals(objCache.get(key1).join(), Optional.of(v));
+
+        objCache.delete(key1).join();
+
+        assertEquals(objCache.getIfCached(key1), Optional.empty());
+        assertEquals(objCache.get(key1).join(), Optional.empty());
+    }
+
+    @Test(dataProvider = "impl")
+    public void insertionDeletion(String provider, String url) throws 
Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+        MetadataCache<MyClass> objCache = 
store.getMetadataCache(MyClass.class);
+
+        String key1 = newKey();
+
+        assertEquals(objCache.getIfCached(key1), Optional.empty());
+
+        MyClass value1 = new MyClass("a", 1);
+        objCache.create(key1, value1).join();
+
+        MyClass value2 = new MyClass("a", 2);
+
+        try {
+            objCache.create(key1, value2).join();
+            fail("should have failed to create");
+        } catch (CompletionException e) {
+            assertEquals(e.getCause().getClass(), 
AlreadyExistsException.class);
+        }
+
+        assertEquals(objCache.getIfCached(key1), Optional.of(value1));
+        assertEquals(objCache.get(key1).join(), Optional.of(value1));
+
+        objCache.delete(key1).join();
+
+        assertEquals(objCache.getIfCached(key1), Optional.empty());
+        assertEquals(objCache.get(key1).join(), Optional.empty());
+    }
+
+    @Test(dataProvider = "impl")
+    public void insertionOutsideCache(String provider, String url) throws 
Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+        MetadataCache<MyClass> objCache = 
store.getMetadataCache(MyClass.class);
+
+        String key1 = newKey();
+
+        MyClass value1 = new MyClass("a", 1);
+        store.put(key1, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1), 
Optional.of(-1L)).join();
+
+        assertEquals(objCache.getIfCached(key1), Optional.empty());
+        assertEquals(objCache.get(key1).join(), Optional.of(value1));
+    }
+
+    @Test(dataProvider = "impl")
+    public void insertionOutsideCacheWithGenericType(String provider, String 
url) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+        MetadataCache<Map<String, String>> objCache = store
+                .getMetadataCache(new TypeReference<Map<String, String>>() {
+                });
+
+        String key1 = newKey();
+
+        Map<String, String> v = new TreeMap<>();
+        v.put("a", "1");
+        v.put("b", "2");
+        store.put(key1, 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(v), 
Optional.of(-1L)).join();
+
+        assertEquals(objCache.getIfCached(key1), Optional.empty());
+        assertEquals(objCache.get(key1).join(), Optional.of(v));
+    }
+
+    @Test(dataProvider = "impl")
+    public void invalidJsonContent(String provider, String url) throws 
Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        MetadataCache<MyClass> objCache = 
store.getMetadataCache(MyClass.class);
+
+        String key1 = newKey();
+
+        store.put(key1, "-------".getBytes(), Optional.of(-1L)).join();
+
+        try {
+            objCache.get(key1).join();
+            fail("should have failed to deserialize");
+        } catch (CompletionException e) {
+            assertEquals(e.getCause().getClass(), 
ContentDeserializationException.class);
+        }
+        assertEquals(objCache.getIfCached(key1), Optional.empty());
+    }
+
+    @Test(dataProvider = "impl")
+    public void readModifyUpdate(String provider, String url) throws Exception 
{
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        MetadataCache<MyClass> objCache = 
store.getMetadataCache(MyClass.class);
+
+        String key1 = newKey();
+
+        MyClass value1 = new MyClass("a", 1);
+        objCache.create(key1, value1).join();
+
+        objCache.readModifyUpdate(key1, v -> {
+            return new MyClass(v.a, v.b + 1);
+        }).join();
+
+        Optional<MyClass> newValue1 = objCache.get(key1).join();
+        assertTrue(newValue1.isPresent());
+        assertEquals(newValue1.get().a, "a");
+        assertEquals(newValue1.get().b, 2);
+
+        // Should fail if the key does not exist
+        try {
+            objCache.readModifyUpdate(newKey(), v -> {
+                return new MyClass(v.a, v.b + 1);
+            }).join();
+        } catch (CompletionException e) {
+            assertEquals(e.getCause().getClass(), NotFoundException.class);
+        }
+    }
+
+    @Test(dataProvider = "impl")
+    public void readModifyUpdateOrCreate(String provider, String url) throws 
Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        MetadataCache<MyClass> objCache = 
store.getMetadataCache(MyClass.class);
+
+        String key1 = newKey();
+
+        objCache.readModifyUpdateOrCreate(key1, optValue -> {
+            if (optValue.isPresent()) {
+                return new MyClass(optValue.get().a, optValue.get().b + 1);
+            } else {
+                return new MyClass("a", 1);
+            }
+        }).join();
+
+        Optional<MyClass> newValue1 = objCache.get(key1).join();
+        assertTrue(newValue1.isPresent());
+        assertEquals(newValue1.get().a, "a");
+        assertEquals(newValue1.get().b, 1);
+
+        objCache.readModifyUpdateOrCreate(key1, optValue -> {
+            assertTrue(optValue.isPresent());
+            return new MyClass(optValue.get().a, optValue.get().b + 1);
+        }).join();
+
+        newValue1 = objCache.get(key1).join();
+        assertTrue(newValue1.isPresent());
+        assertEquals(newValue1.get().a, "a");
+        assertEquals(newValue1.get().b, 2);
+    }
+
+}
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 70f8ea3..1a8be1e 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -27,7 +28,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
@@ -38,31 +42,11 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
 import org.testng.annotations.Test;
 
-public class MetadataStoreTest {
-
-    private TestZKServer zks;
-
-    @BeforeClass
-    void setup() throws Exception {
-        zks = new TestZKServer();
-    }
-
-    @AfterClass
-    void teardown() throws Exception {
-        zks.close();
-    }
-
-    @DataProvider(name = "impl")
-    public Object[][] implementations() {
-        return new Object[][] {
-                { "ZooKeeper", zks.getConnectionString() },
-        };
-    }
+public class MetadataStoreTest extends BaseMetadataStoreTest {
 
     @Test(dataProvider = "impl")
     public void emptyStoreTest(String provider, String url) throws Exception {
@@ -81,14 +65,14 @@ public class MetadataStoreTest {
             store.delete("/non-existing-key", Optional.empty()).join();
             fail("Should have failed");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), NotFoundException.class);
+            assertException(e, NotFoundException.class);
         }
 
         try {
             store.delete("/non-existing-key", Optional.of(1L)).join();
             fail("Should have failed");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), NotFoundException.class);
+            assertException(e, NotFoundException.class);
         }
     }
 
@@ -103,14 +87,14 @@ public class MetadataStoreTest {
             store.put(key1, "value-1".getBytes(), Optional.of(0L)).join();
             fail("Should have failed");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), BadVersionException.class);
+            assertException(e, BadVersionException.class);
         }
 
         try {
             store.put(key1, "value-1".getBytes(), Optional.of(1L)).join();
             fail("Should have failed");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), BadVersionException.class);
+            assertException(e, BadVersionException.class);
         }
 
         store.put(key1, "value-1".getBytes(), Optional.of(-1L)).join();
@@ -125,16 +109,22 @@ public class MetadataStoreTest {
             store.put(key1, "value-2".getBytes(), Optional.of(-1L)).join();
             fail("Should have failed");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), BadVersionException.class);
+            assertException(e, BadVersionException.class);
         }
 
         try {
             store.put(key1, "value-2".getBytes(), Optional.of(1L)).join();
             fail("Should have failed");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), BadVersionException.class);
+            assertException(e, BadVersionException.class);
         }
 
+        assertTrue(store.exists(key1).join());
+        optRes = store.get(key1).join();
+        assertTrue(optRes.isPresent());
+        assertEquals(optRes.get().getValue(), "value-1".getBytes());
+        assertEquals(optRes.get().getStat().getVersion(), 0);
+
         store.put(key1, "value-2".getBytes(), Optional.of(0L)).join();
 
         assertTrue(store.exists(key1).join());
@@ -188,7 +178,7 @@ public class MetadataStoreTest {
             store.delete(key, Optional.empty()).join();
             fail("The key has children");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), 
MetadataStoreException.class);
+            assertException(e, MetadataStoreException.class);
         }
 
         for (int i = 0; i < N; i++) {
@@ -196,7 +186,7 @@ public class MetadataStoreTest {
                 store.delete(key + "/c-" + i, Optional.of(1L)).join();
                 fail("The key has children");
             } catch (CompletionException e) {
-                assertEquals(e.getCause().getClass(), 
BadVersionException.class);
+                assertException(e, BadVersionException.class);
             }
 
             store.delete(key + "/c-" + i, Optional.empty()).join();
@@ -212,39 +202,98 @@ public class MetadataStoreTest {
             store.delete("", Optional.empty()).join();
             fail("The key cannot be empty");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), 
MetadataStoreException.class);
+            assertException(e, MetadataStoreException.class);
         }
 
         try {
             store.getChildren("").join();
             fail("The key cannot be empty");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), 
MetadataStoreException.class);
+            assertException(e, MetadataStoreException.class);
         }
 
         try {
             store.get("").join();
             fail("The key cannot be empty");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), 
MetadataStoreException.class);
+            assertException(e, MetadataStoreException.class);
         }
 
         try {
             store.exists("").join();
             fail("The key cannot be empty");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), 
MetadataStoreException.class);
+            assertException(e, MetadataStoreException.class);
         }
 
         try {
             store.put("", new byte[0], Optional.empty()).join();
             fail("The key cannot be empty");
         } catch (CompletionException e) {
-            assertEquals(e.getCause().getClass(), 
MetadataStoreException.class);
+            assertException(e, MetadataStoreException.class);
         }
     }
 
-    private static String newKey() {
-        return "/key-" + System.nanoTime();
+    @Test(dataProvider = "impl")
+    public void notificationListeners(String provider, String url) throws 
Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(url, 
MetadataStoreConfig.builder().build());
+
+        BlockingQueue<Notification> notifications = new 
LinkedBlockingDeque<>();
+        store.registerListener(n -> {
+            notifications.add(n);
+        });
+
+        String key1 = newKey();
+
+        assertFalse(store.get(key1).join().isPresent());
+
+        // Trigger created notification
+        store.put(key1, "value-1".getBytes(), Optional.empty()).join();
+        assertTrue(store.get(key1).join().isPresent());
+        assertEquals(store.getChildren(key1).join(), Collections.emptyList());
+
+        Notification n = notifications.poll(3, TimeUnit.SECONDS);
+        assertNotNull(n);
+        assertEquals(n.getType(), NotificationType.Created);
+        assertEquals(n.getPath(), key1);
+
+        // Trigger modified notification
+        store.put(key1, "value-2".getBytes(), Optional.empty()).join();
+        n = notifications.poll(3, TimeUnit.SECONDS);
+        assertNotNull(n);
+        assertEquals(n.getType(), NotificationType.Modified);
+        assertEquals(n.getPath(), key1);
+
+        // Trigger modified notification on the parent
+        String key1Child = key1 + "/xx";
+
+        assertFalse(store.get(key1Child).join().isPresent());
+
+        store.put(key1Child, "value-2".getBytes(), Optional.empty()).join();
+        n = notifications.poll(3, TimeUnit.SECONDS);
+        assertNotNull(n);
+        assertEquals(n.getType(), NotificationType.Created);
+        assertEquals(n.getPath(), key1Child);
+
+        n = notifications.poll(3, TimeUnit.SECONDS);
+        assertNotNull(n);
+        assertEquals(n.getType(), NotificationType.ChildrenChanged);
+        assertEquals(n.getPath(), key1);
+
+        assertTrue(store.exists(key1Child).join());
+        assertEquals(store.getChildren(key1).join(), 
Collections.singletonList("xx"));
+
+        store.delete(key1Child, Optional.empty()).join();
+        n = notifications.poll(3, TimeUnit.SECONDS);
+        assertNotNull(n);
+        assertEquals(n.getType(), NotificationType.Deleted);
+        assertEquals(n.getPath(), key1Child);
+
+        // Parent should be notified of the deletion
+        n = notifications.poll(3, TimeUnit.SECONDS);
+        assertNotNull(n);
+        assertEquals(n.getType(), NotificationType.ChildrenChanged);
+        assertEquals(n.getPath(), key1);
     }
 }

Reply via email to