This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d31863  Created PIP-45: Pluggable metadata interface (markdown)
4d31863 is described below

commit 4d31863cb233e12fc1d74d5097facfd5869b2d3b
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Oct 9 11:32:28 2019 -0700

    Created PIP-45: Pluggable metadata interface (markdown)
---
 PIP-45:-Pluggable-metadata-interface.md | 317 ++++++++++++++++++++++++++++++++
 1 file changed, 317 insertions(+)

diff --git a/PIP-45:-Pluggable-metadata-interface.md 
b/PIP-45:-Pluggable-metadata-interface.md
new file mode 100644
index 0000000..c8404d9
--- /dev/null
+++ b/PIP-45:-Pluggable-metadata-interface.md
@@ -0,0 +1,317 @@
+
+# PIP-45 - Pluggable metadata interface
+
+## Goals
+
+Provide a unified pluggable interface that can abstract all the Pulsar metadata
+interactions.
+
+After the refactoring, the default implementation will still be based on 
ZooKeeper and it will
+be 100% compatible with the existing metadata. The metadata will be kept in 
the same location
+and in the same exact format.
+
+Once we have the interface defined we could have multiple backend 
implementations:
+  * ZooKeeper
+  * Etcd
+  * In memory - for unit tests purposes
+  * On local disk - for usage in Pulsar standalone
+
+## Context
+
+Pulsar is currently using ZooKeeper for metadata and coordination purposes. 
These accesses are
+being done from Pulsar brokers and some administrative CLI tools. BookKeeper 
already supports
+a pluggable metadata store.
+
+Additionally, ZooKeeper client API is being accessed from several places in 
the codebase, so we
+first need to consolidate all these accesses through a single generic 
`MetadataStore` interface.
+
+This interface is based on the needs that Pulsar has in interacting with 
metadata and with the
+semantics offered by existing metadata stores (eg. ZooKeeper, Etcd and 
others). The API will be
+considered as "Beta" (meaning it could be evolved in breaking way) until we 
have at least few
+concrete implementations. Therefore, at least initially, this will be an 
internal Pulsar API and
+it will not be open to user plugins.
+
+## Refactoring steps
+
+### 1. Define metadata store API
+
+The metadata store is modeled after a basic Key-Value interface with 
`compareAndSet()` updates
+based on the version of a particular value.
+
+```java
+public interface MetadataStore extends AutoCloseable {
+
+    /**
+     * Read the value of one key, identified by the path
+     *
+     * The async call will return a future that yields a {@link GetResult} 
that will contain the value and the
+     * associated {@link Stat} object.
+     *
+     * If the value is not found, the future will yield an empty {@link 
Optional}.
+     *
+     * @param path
+     *            the path of the key to get from the store
+     * @return a future to track the async request
+     */
+    CompletableFuture<Optional<GetResult>> get(String path);
+
+    /**
+     * Return all the nodes (lexicographically sorted) that are children to 
the specific path.
+     *
+     * If the path itself does not exist, it will return an empty list.
+     *
+     * @param path
+     *            the path of the key to check on the store
+     * @return a future to track the async request
+     */
+    CompletableFuture<List<String>> getChildren(String path);
+
+    /**
+     * Read whether a specific path exists.
+     *
+     * Note: In case of keys with multiple levels (eg: '/a/b/c'), checking the 
existence of a parent (eg. '/a') might
+     * not necessarily return true, unless the key had been explicitly created.
+     *
+     * @param path
+     *            the path of the key to check on the store
+     * @return a future to track the async request
+     */
+    CompletableFuture<Boolean> exists(String path);
+
+    /**
+     * Put a new value for a given key.
+     *
+     * The caller can specify an expected version to be atomically checked 
against the current version of the stored
+     * data.
+     *
+     * The future will return the {@link Stat} object associated with the 
newly inserted value.
+     *
+     *
+     * @param path
+     *            the path of the key to delete from the store
+     * @param value
+     *            the value to
+     * @param expectedVersion
+     *            if present, the version will have to match with the 
currently stored value for the operation to
+     *            succeed. Use -1 to enforce a non-existing value.
+     * @throws BadVersionException
+     *             if the expected version doesn't match the actual version of 
the data
+     * @return a future to track the async request
+     */
+    CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> 
expectedVersion);
+
+    /**
+     *
+     * @param path
+     *            the path of the key to delete from the store
+     * @param expectedVersion
+     *            if present, the version will have to match with the 
currently stored value for the operation to
+     *            succeed
+     * @throws NotFoundException
+     *             if the path is not found
+     * @throws BadVersionException
+     *             if the expected version doesn't match the actual version of 
the data
+     * @return a future to track the async request
+     */
+    CompletableFuture<Void> delete(String path, Optional<Long> 
expectedVersion);
+}
+```
+
+Additionally, when the `MetadataStore` is created, it should be possible to 
specify an observer
+function that will be triggered whenever there are changes on a sub-tree of 
the specified keys.
+
+This will be used to keep local caches updated without any polling.
+
+### 2. Define coordination interface
+
+Pulsar broker uses "coordination" in several different places. Examples are:
+
+ * List of active brokers and their current load data report
+ * Acquire ownership on a portion of a namespace topics (bundle)
+ * Leader election (load manager)
+ * Counters for generating unique prefix identifier
+
+While in general these can be implemented through a Key-Value interface with 
the help of
+flags (eg. "ephemeral" nodes in ZooKeeper), each backend system might have a 
more direct way
+to implement these.
+
+```java
+/**
+ * Interface for the coordination service. Provides abstraction for 
distributed locks and leader election.
+ */
+public interface CoordinationService extends AutoCloseable {
+
+    /**
+     * Read the content of an existing lock.
+     *
+     * If the lock is already taken, this operation will fail immediately.
+     *
+     * Warning: because of the distributed nature of the lock, having acquired 
a lock will never provide a strong
+     * guarantee that no one else also think it owns the same resource. The 
caller will have to deal with these race
+     * conditions when using the resource itself (eg. using compareAndSet() or 
fencing mechanisms).
+     *
+     * @param path
+     *            the path of the resource on which to acquire the lock
+     * @param content
+     *            the payload of the lock
+     * @return a future that will track the completion of the operation
+     * @throws NotFoundException
+     *             if the lock is not taken
+     * @throws CoordinationServiceException
+     *             if there's a failure in reading the lock
+     */
+    CompletableFuture<Optional<byte[]>> readLock(String path);
+
+    /**
+     * Acquire a lock on a shared resource.
+     *
+     * If the lock is already taken, this operation will fail immediately.
+     *
+     * Warning: because of the distributed nature of the lock, having acquired 
a lock will never provide a strong
+     * guarantee that no one else also think it owns the same resource. The 
caller will have to deal with these race
+     * conditions when using the resource itself (eg. using compareAndSet() or 
fencing mechanisms).
+     *
+     * @param path
+     *            the path of the resource on which to acquire the lock
+     * @param content
+     *            the payload of the lock
+     * @return a future that will track the completion of the operation
+     * @throws ResourceBusyException
+     *             if the lock is already taken
+     * @throws CoordinationServiceException
+     *             if there's a failure in acquiring the lock
+     */
+    CompletableFuture<ResourceLock> acquireLock(String path, byte[] content);
+
+    /**
+     * List all the locks that are children of a specific path.
+     *
+     * For example, given locks: <code>/a/b/lock-1</code> and 
<code>/a/b/lock-2</code>, the <code>listLocks()</code>
+     * will return a list of <code>["lock-1", "lock-2"]</code>.
+     *
+     * @param path
+     *            the prefix path to get the list of locks
+     * @return a future that will track the completion of the operation
+     * @throws CoordinationServiceException
+     *             if there's a failure in getting the list of locks
+     */
+    CompletableFuture<List<String>> listLocks(String path);
+
+    /**
+     * Try to become the leader for the specified resource.
+     *
+     * If there's already a leader, this request will be kept pending the 
current process is the one to become the
+     * leader.
+     *
+     *
+     * Warning: because of the distributed nature of the leader election, 
having been promoted to "leader" status will
+     * never provide a strong guarantee that no one else also thinks it's the 
leader. The caller will have to deal with
+     * these race conditions when using the resource itself (eg. using 
compareAndSet() or fencing mechanisms).
+     *
+     * @param path
+     *            the path of the resource of which to become the leader
+     * @param content
+     *            the payload of the lock
+     * @return a future that will track the completion of the operation
+     * @throws CoordinationServiceException
+     *             if there's a failure in the leader election
+     */
+    CompletableFuture<ResourceLock> becomeLeader(String path, byte[] content);
+
+    /**
+     * Increment a counter identified by the specified path and return the 
current value.
+     *
+     * The counter value will be guaranteed to be unique within the context of 
the path.
+     *
+     * @param path
+     *            the path that identifies a particular counter
+     * @return a future that will track the completion of the operation
+     * @throws CoordinationServiceException
+     *             if there's a failure in incrementing the counter
+     */
+    CompletableFuture<Long> getNextCounterValue(String path);
+}
+
+
+/**
+ * Represent a lock that the current process has on a shared resource.
+ */
+public interface ResourceLock {
+
+    /**
+     * @return the content associated with the lock
+     */
+    byte[] getContent();
+
+    /**
+     * Release the lock on the resource.
+     *
+     * @return a future to track when the release operation is complete
+     */
+    CompletableFuture<Void> release();
+
+    /**
+     * Get a future that can be used to get notified when the lock is not more 
valid.
+     *
+     * Note: the future will not be triggered when the lock is voluntarily 
released.
+     *
+     * @return a future to get notification if the lock is expired
+     */
+    CompletableFuture<Void> getLockExpiredFuture();
+}
+```
+
+### 3. Port ManagedLedger to use MetadataStore
+
+ManagedLedger is already using an abstraction for metadata access (see 
`MetaStore`). It will be
+easy to convert that to the `MetadataStore` API.
+
+### 4. Define metadata cache API
+
+Currently, most metadata read accesses are happening through the 
`ZooKeeperCache` and additional
+classes based on it, like `ZooKeeperDataCache` and `ZooKeeperChildrenCache`.
+
+The cache needs to be ported to use `MetadataStore` API, along with the 
support for receiving
+notifications and invalidating the stale entries.
+
+An additional concept that should be added to the cache is the atomic 
"read-modify-update" operation.
+
+This is currently being performed from many places in the code base and it 
should be consolidated
+into a single implementation. For example:
+
+```java
+public interface TypedMetadataCache<T> {
+    // ...
+
+    /**
+     * Perform an atomic read-modify-update of the value.
+     *
+     * The modify function can potentially be called multiple times if there 
are concurrent updates happening.
+     *
+     * @param path
+     *            the path of the value
+     * @param modifyFunction
+     *            a function that will be passed the current value and returns 
a modified value to be stored
+     * @return a future to track the completion of the operation
+     */
+    CompletableFuture<Void> readModifyUpdate(String path, Function<T, T> 
modifyFunction);
+}
+```
+
+### 5. Define higher level abstraction for metadata
+
+While the ZooKeeperCache is already typed (in order to store the object 
already deserialized), we
+should have an additional abstraction layer to mediate the access to the 
metadata.
+
+For example, to get the list of tenants in a cluster, we shouldn't use 
`MetadataStore.getChildren()`
+directly from multiple places. Rather, we need to provide a 
`ConfigurationStore` interface such as:
+
+```java
+public interface ConfigurationStore {
+    CompletableFuture<List<String>> getTenants();
+
+    CompletableFuture<List<String>> getNamespaces(String tenant);
+
+    // ....
+}
+```

Reply via email to