This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git
The following commit(s) were added to refs/heads/master by this push:
new 4d31863 Created PIP-45: Pluggable metadata interface (markdown)
4d31863 is described below
commit 4d31863cb233e12fc1d74d5097facfd5869b2d3b
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Oct 9 11:32:28 2019 -0700
Created PIP-45: Pluggable metadata interface (markdown)
---
PIP-45:-Pluggable-metadata-interface.md | 317 ++++++++++++++++++++++++++++++++
1 file changed, 317 insertions(+)
diff --git a/PIP-45:-Pluggable-metadata-interface.md
b/PIP-45:-Pluggable-metadata-interface.md
new file mode 100644
index 0000000..c8404d9
--- /dev/null
+++ b/PIP-45:-Pluggable-metadata-interface.md
@@ -0,0 +1,317 @@
+
+# PIP-45 - Pluggable metadata interface
+
+## Goals
+
+Provide a unified pluggable interface that can abstract all the Pulsar metadata
+interactions.
+
+After the refactoring, the default implementation will still be based on
ZooKeeper and it will
+be 100% compatible with the existing metadata. The metadata will be kept in
the same location
+and in the same exact format.
+
+Once we have the interface defined we could have multiple backend
implementations:
+ * ZooKeeper
+ * Etcd
+ * In memory - for unit tests purposes
+ * On local disk - for usage in Pulsar standalone
+
+## Context
+
+Pulsar is currently using ZooKeeper for metadata and coordination purposes.
These accesses are
+being done from Pulsar brokers and some administrative CLI tools. BookKeeper
already supports
+a pluggable metadata store.
+
+Additionally, ZooKeeper client API is being accessed from several places in
the codebase, so we
+first need to consolidate all these accesses through a single generic
`MetadataStore` interface.
+
+This interface is based on the needs that Pulsar has in interacting with
metadata and with the
+semantics offered by existing metadata stores (eg. ZooKeeper, Etcd and
others). The API will be
+considered as "Beta" (meaning it could be evolved in breaking way) until we
have at least few
+concrete implementations. Therefore, at least initially, this will be an
internal Pulsar API and
+it will not be open to user plugins.
+
+## Refactoring steps
+
+### 1. Define metadata store API
+
+The metadata store is modeled after a basic Key-Value interface with
`compareAndSet()` updates
+based on the version of a particular value.
+
+```java
+public interface MetadataStore extends AutoCloseable {
+
+ /**
+ * Read the value of one key, identified by the path
+ *
+ * The async call will return a future that yields a {@link GetResult}
that will contain the value and the
+ * associated {@link Stat} object.
+ *
+ * If the value is not found, the future will yield an empty {@link
Optional}.
+ *
+ * @param path
+ * the path of the key to get from the store
+ * @return a future to track the async request
+ */
+ CompletableFuture<Optional<GetResult>> get(String path);
+
+ /**
+ * Return all the nodes (lexicographically sorted) that are children to
the specific path.
+ *
+ * If the path itself does not exist, it will return an empty list.
+ *
+ * @param path
+ * the path of the key to check on the store
+ * @return a future to track the async request
+ */
+ CompletableFuture<List<String>> getChildren(String path);
+
+ /**
+ * Read whether a specific path exists.
+ *
+ * Note: In case of keys with multiple levels (eg: '/a/b/c'), checking the
existence of a parent (eg. '/a') might
+ * not necessarily return true, unless the key had been explicitly created.
+ *
+ * @param path
+ * the path of the key to check on the store
+ * @return a future to track the async request
+ */
+ CompletableFuture<Boolean> exists(String path);
+
+ /**
+ * Put a new value for a given key.
+ *
+ * The caller can specify an expected version to be atomically checked
against the current version of the stored
+ * data.
+ *
+ * The future will return the {@link Stat} object associated with the
newly inserted value.
+ *
+ *
+ * @param path
+ * the path of the key to delete from the store
+ * @param value
+ * the value to
+ * @param expectedVersion
+ * if present, the version will have to match with the
currently stored value for the operation to
+ * succeed. Use -1 to enforce a non-existing value.
+ * @throws BadVersionException
+ * if the expected version doesn't match the actual version of
the data
+ * @return a future to track the async request
+ */
+ CompletableFuture<Stat> put(String path, byte[] value, Optional<Long>
expectedVersion);
+
+ /**
+ *
+ * @param path
+ * the path of the key to delete from the store
+ * @param expectedVersion
+ * if present, the version will have to match with the
currently stored value for the operation to
+ * succeed
+ * @throws NotFoundException
+ * if the path is not found
+ * @throws BadVersionException
+ * if the expected version doesn't match the actual version of
the data
+ * @return a future to track the async request
+ */
+ CompletableFuture<Void> delete(String path, Optional<Long>
expectedVersion);
+}
+```
+
+Additionally, when the `MetadataStore` is created, it should be possible to
specify an observer
+function that will be triggered whenever there are changes on a sub-tree of
the specified keys.
+
+This will be used to keep local caches updated without any polling.
+
+### 2. Define coordination interface
+
+Pulsar broker uses "coordination" in several different places. Examples are:
+
+ * List of active brokers and their current load data report
+ * Acquire ownership on a portion of a namespace topics (bundle)
+ * Leader election (load manager)
+ * Counters for generating unique prefix identifier
+
+While in general these can be implemented through a Key-Value interface with
the help of
+flags (eg. "ephemeral" nodes in ZooKeeper), each backend system might have a
more direct way
+to implement these.
+
+```java
+/**
+ * Interface for the coordination service. Provides abstraction for
distributed locks and leader election.
+ */
+public interface CoordinationService extends AutoCloseable {
+
+ /**
+ * Read the content of an existing lock.
+ *
+ * If the lock is already taken, this operation will fail immediately.
+ *
+ * Warning: because of the distributed nature of the lock, having acquired
a lock will never provide a strong
+ * guarantee that no one else also think it owns the same resource. The
caller will have to deal with these race
+ * conditions when using the resource itself (eg. using compareAndSet() or
fencing mechanisms).
+ *
+ * @param path
+ * the path of the resource on which to acquire the lock
+ * @param content
+ * the payload of the lock
+ * @return a future that will track the completion of the operation
+ * @throws NotFoundException
+ * if the lock is not taken
+ * @throws CoordinationServiceException
+ * if there's a failure in reading the lock
+ */
+ CompletableFuture<Optional<byte[]>> readLock(String path);
+
+ /**
+ * Acquire a lock on a shared resource.
+ *
+ * If the lock is already taken, this operation will fail immediately.
+ *
+ * Warning: because of the distributed nature of the lock, having acquired
a lock will never provide a strong
+ * guarantee that no one else also think it owns the same resource. The
caller will have to deal with these race
+ * conditions when using the resource itself (eg. using compareAndSet() or
fencing mechanisms).
+ *
+ * @param path
+ * the path of the resource on which to acquire the lock
+ * @param content
+ * the payload of the lock
+ * @return a future that will track the completion of the operation
+ * @throws ResourceBusyException
+ * if the lock is already taken
+ * @throws CoordinationServiceException
+ * if there's a failure in acquiring the lock
+ */
+ CompletableFuture<ResourceLock> acquireLock(String path, byte[] content);
+
+ /**
+ * List all the locks that are children of a specific path.
+ *
+ * For example, given locks: <code>/a/b/lock-1</code> and
<code>/a/b/lock-2</code>, the <code>listLocks()</code>
+ * will return a list of <code>["lock-1", "lock-2"]</code>.
+ *
+ * @param path
+ * the prefix path to get the list of locks
+ * @return a future that will track the completion of the operation
+ * @throws CoordinationServiceException
+ * if there's a failure in getting the list of locks
+ */
+ CompletableFuture<List<String>> listLocks(String path);
+
+ /**
+ * Try to become the leader for the specified resource.
+ *
+ * If there's already a leader, this request will be kept pending the
current process is the one to become the
+ * leader.
+ *
+ *
+ * Warning: because of the distributed nature of the leader election,
having been promoted to "leader" status will
+ * never provide a strong guarantee that no one else also thinks it's the
leader. The caller will have to deal with
+ * these race conditions when using the resource itself (eg. using
compareAndSet() or fencing mechanisms).
+ *
+ * @param path
+ * the path of the resource of which to become the leader
+ * @param content
+ * the payload of the lock
+ * @return a future that will track the completion of the operation
+ * @throws CoordinationServiceException
+ * if there's a failure in the leader election
+ */
+ CompletableFuture<ResourceLock> becomeLeader(String path, byte[] content);
+
+ /**
+ * Increment a counter identified by the specified path and return the
current value.
+ *
+ * The counter value will be guaranteed to be unique within the context of
the path.
+ *
+ * @param path
+ * the path that identifies a particular counter
+ * @return a future that will track the completion of the operation
+ * @throws CoordinationServiceException
+ * if there's a failure in incrementing the counter
+ */
+ CompletableFuture<Long> getNextCounterValue(String path);
+}
+
+
+/**
+ * Represent a lock that the current process has on a shared resource.
+ */
+public interface ResourceLock {
+
+ /**
+ * @return the content associated with the lock
+ */
+ byte[] getContent();
+
+ /**
+ * Release the lock on the resource.
+ *
+ * @return a future to track when the release operation is complete
+ */
+ CompletableFuture<Void> release();
+
+ /**
+ * Get a future that can be used to get notified when the lock is not more
valid.
+ *
+ * Note: the future will not be triggered when the lock is voluntarily
released.
+ *
+ * @return a future to get notification if the lock is expired
+ */
+ CompletableFuture<Void> getLockExpiredFuture();
+}
+```
+
+### 3. Port ManagedLedger to use MetadataStore
+
+ManagedLedger is already using an abstraction for metadata access (see
`MetaStore`). It will be
+easy to convert that to the `MetadataStore` API.
+
+### 4. Define metadata cache API
+
+Currently, most metadata read accesses are happening through the
`ZooKeeperCache` and additional
+classes based on it, like `ZooKeeperDataCache` and `ZooKeeperChildrenCache`.
+
+The cache needs to be ported to use `MetadataStore` API, along with the
support for receiving
+notifications and invalidating the stale entries.
+
+An additional concept that should be added to the cache is the atomic
"read-modify-update" operation.
+
+This is currently being performed from many places in the code base and it
should be consolidated
+into a single implementation. For example:
+
+```java
+public interface TypedMetadataCache<T> {
+ // ...
+
+ /**
+ * Perform an atomic read-modify-update of the value.
+ *
+ * The modify function can potentially be called multiple times if there
are concurrent updates happening.
+ *
+ * @param path
+ * the path of the value
+ * @param modifyFunction
+ * a function that will be passed the current value and returns
a modified value to be stored
+ * @return a future to track the completion of the operation
+ */
+ CompletableFuture<Void> readModifyUpdate(String path, Function<T, T>
modifyFunction);
+}
+```
+
+### 5. Define higher level abstraction for metadata
+
+While the ZooKeeperCache is already typed (in order to store the object
already deserialized), we
+should have an additional abstraction layer to mediate the access to the
metadata.
+
+For example, to get the list of tenants in a cluster, we shouldn't use
`MetadataStore.getChildren()`
+directly from multiple places. Rather, we need to provide a
`ConfigurationStore` interface such as:
+
+```java
+public interface ConfigurationStore {
+ CompletableFuture<List<String>> getTenants();
+
+ CompletableFuture<List<String>> getNamespaces(String tenant);
+
+ // ....
+}
+```