This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 590e1331d9a [improve][pip] PIP-378 Add ServiceUnitStateTableView 
abstraction (ExtensibleLoadMangerImpl only) (#23300)
590e1331d9a is described below

commit 590e1331d9a4a2b62297f155131c1641037ca3b1
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Tue Sep 17 12:27:11 2024 -0700

    [improve][pip] PIP-378 Add ServiceUnitStateTableView abstraction 
(ExtensibleLoadMangerImpl only) (#23300)
---
 pip/pip-378.md | 321 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 321 insertions(+)

diff --git a/pip/pip-378.md b/pip/pip-378.md
new file mode 100644
index 00000000000..352c7fa560d
--- /dev/null
+++ b/pip/pip-378.md
@@ -0,0 +1,321 @@
+# PIP-378: Add ServiceUnitStateTableView abstraction (ExtensibleLoadMangerImpl 
only)
+
+## Background
+
+### ExtensibleLoadMangerImpl uses system topics to event-source bundle 
ownerships
+
+PIP-192 introduces a new broker load balancer using a persistent system topic 
to event-source bundle ownerships among brokers.
+
+PIP-307 introduces graceful ownership change protocol over the system topic 
(from PIP-192).
+
+However, using system topics to manage bundle ownerships may not always be the 
best choice. Users might need an alternative approach to event-source bundle 
ownerships.
+
+
+## Motivation
+
+Add `ServiceUnitStateTableView` abstraction and make it pluggable, so users 
can customize `ServiceUnitStateTableView` implementations and event-source 
bundles ownerships using other stores.
+
+## Goals
+
+### In Scope
+
+- Add `ServiceUnitStateTableView` interface
+- Add `ServiceUnitStateTableViewImpl` implementation that uses Pulsar System 
topic (compatible with existing behavior)
+- Add `ServiceUnitStateMetadataStoreTableViewImpl` implementation that uses 
Pulsar Metadata Store (new behavior)
+- Refactor related code and test code
+
+## High-Level Design
+
+- Refactor `ServiceUnitStateChannelImpl` to accept `ServiceUnitStateTableView` 
interface and `ServiceUnitStateTableViewImpl` system topic implementation. 
+- Introduce `MetadataStoreTableView` interface to support 
`ServiceUnitStateMetadataStoreTableViewImpl` implementation.
+- `MetadataStoreTableViewImpl` will use shadow hashmap to maintain the 
metadata tableview. It will initially fill the local tableview by scanning all 
existing items in the metadata store path. Also, new items will be updated to 
the tableview via metadata watch notifications.
+- Add `BiConsumer<String, Optional<CacheGetResult<T>>> asyncReloadConsumer` in 
MetadataCacheConfig to listen the automatic cache async reload. This can be 
useful to re-sync the the shadow hashmap in MetadataStoreTableViewImpl in case 
it is out-dated in the worst case(e.g. network or metadata issues).
+- Introduce `ServiceUnitStateTableViewSyncer` to sync system topic and 
metadata store table views to migrate to one from the other. This syncer can be 
enabled by a dynamic config, `loadBalancerServiceUnitTableViewSyncerEnabled`.
+
+## Detailed Design
+
+### Design & Implementation Details
+```java
+/**
+ * Given that the ServiceUnitStateChannel event-sources service unit (bundle) 
ownership states via a persistent store
+ * and reacts to ownership changes, the ServiceUnitStateTableView provides an 
interface to the
+ * ServiceUnitStateChannel's persistent store and its locally replicated 
ownership view (tableview) with listener
+ * registration. It initially populates its local table view by scanning 
existing items in the remote store. The
+ * ServiceUnitStateTableView receives notifications whenever ownership states 
are updated in the remote store, and
+ * upon notification, it applies the updates to its local tableview with the 
listener logic.
+ */
+public interface ServiceUnitStateTableView extends Closeable {
+
+    /**
+     * Starts the tableview.
+     * It initially populates its local table view by scanning existing items 
in the remote store, and it starts
+     * listening to service unit ownership changes from the remote store.
+     * @param pulsar pulsar service reference
+     * @param tailItemListener listener to listen tail(newly updated) items
+     * @param existingItemListener listener to listen existing items
+     * @throws IOException if it fails to init the tableview.
+     */
+    void start(PulsarService pulsar,
+               BiConsumer<String, ServiceUnitStateData> tailItemListener,
+               BiConsumer<String, ServiceUnitStateData> existingItemListener) 
throws IOException;
+
+
+    /**
+     * Closes the tableview.
+     * @throws IOException if it fails to close the tableview.
+     */
+    void close() throws IOException;
+
+    /**
+     * Gets one item from the local tableview.
+     * @param key the key to get
+     * @return value if exists. Otherwise, null.
+     */
+    ServiceUnitStateData get(String key);
+
+    /**
+     * Tries to put the item in the persistent store.
+     * If it completes, all peer tableviews (including the local one) will be 
notified and be eventually consistent
+     * with this put value.
+     *
+     * It ignores put operation if the input value conflicts with the existing 
one in the persistent store.
+     *
+     * @param key the key to put
+     * @param value the value to put
+     * @return a future to track the completion of the operation
+     */
+    CompletableFuture<Void> put(String key, ServiceUnitStateData value);
+
+    /**
+     * Tries to delete the item from the persistent store.
+     * All peer tableviews (including the local one) will be notified and be 
eventually consistent with this deletion.
+     *
+     * It ignores delete operation if the key is not present in the persistent 
store.
+     *
+     * @param key the key to delete
+     * @return a future to track the completion of the operation
+     */
+    CompletableFuture<Void> delete(String key);
+
+    /**
+     * Returns the entry set of the items in the local tableview.
+     * @return entry set
+     */
+    Set<Map.Entry<String, ServiceUnitStateData>> entrySet();
+
+    /**
+     * Returns service units (namespace bundles) owned by this broker.
+     * @return a set of owned service units (namespace bundles)
+     */
+    Set<NamespaceBundle> ownedServiceUnits();
+
+    /**
+     * Tries to flush any batched or buffered updates.
+     * @param waitDurationInMillis time to wait until complete.
+     * @throws ExecutionException
+     * @throws InterruptedException
+     * @throws TimeoutException
+     */
+    void flush(long waitDurationInMillis) throws ExecutionException, 
InterruptedException, TimeoutException;
+}
+```
+
+```java
+/**
+ * Defines metadata store tableview.
+ * MetadataStoreTableView initially fills existing items to its local 
tableview and eventually
+ * synchronize remote updates to its local tableview from the remote metadata 
store.
+ * This abstraction can help replicate metadata in memory from metadata store.
+ */
+public interface MetadataStoreTableView<T> {
+
+    class ConflictException extends RuntimeException {
+        public ConflictException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
+     * Starts the tableview by filling existing items to its local tableview 
from the remote metadata store.
+     */
+    void start() throws MetadataStoreException;
+
+    /**
+     * Reads whether a specific key exists in the local tableview.
+     *
+     * @param key the key to check
+     * @return true if exists. Otherwise, false.
+     */
+    boolean exists(String key);
+
+    /**
+     * Gets one item from the local tableview.
+     * <p>
+     * If the key is not found, return null.
+     *
+     * @param key the key to check
+     * @return value if exists. Otherwise, null.
+     */
+    T get(String key);
+
+    /**
+     * Tries to put the item in the persistent store.
+     * All peer tableviews (including the local one) will be notified and be 
eventually consistent with this put value.
+     * <p>
+     * This operation can fail if the input value conflicts with the existing 
one.
+     *
+     * @param key the key to check on the tableview
+     * @return a future to track the completion of the operation
+     * @throws MetadataStoreTableView.ConflictException
+     *             if the input value conflicts with the existing one.
+     */
+    CompletableFuture<Void> put(String key, T value);
+
+    /**
+     * Tries to delete the item from the persistent store.
+     * All peer tableviews (including the local one) will be notified and be 
eventually consistent with this deletion.
+     * <p>
+     * This can fail if the item is not present in the metadata store.
+     *
+     * @param key the key to check on the tableview
+     * @return a future to track the completion of the operation
+     * @throws MetadataStoreException.NotFoundException
+     *             if the key is not present in the metadata store.
+     */
+    CompletableFuture<Void> delete(String key);
+
+    /**
+     * Returns the size of the items in the local tableview.
+     * @return size
+     */
+    int size();
+
+    /**
+     * Reads whether the local tableview is empty or not.
+     * @return true if empty. Otherwise, false
+     */
+    boolean isEmpty();
+
+    /**
+     * Returns the entry set of the items in the local tableview.
+     * @return entry set
+     */
+    Set<Map.Entry<String, T>> entrySet();
+
+    /**
+     * Returns the key set of the items in the local tableview.
+     * @return key set
+     */
+    Set<String> keySet();
+
+    /**
+     * Returns the values of the items in the local tableview.
+     * @return values
+     */
+    Collection<T> values();
+
+    /**
+     * Runs the action for each item in the local tableview.
+     */
+    void forEach(BiConsumer<String, T> action);
+}
+```
+
+```java
+public class MetadataCacheConfig<T> {
+    private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+
+    ...
+
+    /**
+     * Specifies cache reload consumer behavior when the cache is refreshed 
automatically at refreshAfterWriteMillis
+     * frequency.
+     */
+    @Builder.Default
+    private final BiConsumer<String, Optional<CacheGetResult<T>>> 
asyncReloadConsumer = null;
+```
+
+```java
+
+/**
+ * ServiceUnitStateTableViewSyncer can be used to sync system topic and 
metadata store table views to migrate to one
+ * from the other.
+ */
+@Slf4j
+public class ServiceUnitStateTableViewSyncer implements Cloneable {
+    private static final int SYNC_TIMEOUT_IN_SECS = 30;
+    private volatile ServiceUnitStateTableView systemTopicTableView;
+    private volatile ServiceUnitStateTableView metadataStoreTableView;
+
+
+    public void start(PulsarService pulsar) throws IOException {
+        if 
(!pulsar.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()) {
+            return;
+        }
+        try {
+            if (systemTopicTableView == null) {
+                systemTopicTableView = new ServiceUnitStateTableViewImpl();
+                systemTopicTableView.start(
+                        pulsar,
+                        this::syncToMetadataStore,
+                        this::syncToMetadataStore);
+                log.info("Successfully started 
ServiceUnitStateTableViewSyncer::systemTopicTableView");
+            }
+
+            if (metadataStoreTableView == null) {
+                metadataStoreTableView = new 
ServiceUnitStateMetadataStoreTableViewImpl();
+                metadataStoreTableView.start(
+                        pulsar,
+                        this::syncToSystemTopic,
+                        this::syncToSystemTopic);
+                log.info("Successfully started 
ServiceUnitStateTableViewSyncer::metadataStoreTableView");
+            }
+
+        } catch (Throwable e) {
+            log.error("Failed to start ServiceUnitStateTableViewSyncer", e);
+            throw e;
+        }
+    }
+
+    private void syncToSystemTopic(String key, ServiceUnitStateData data) {
+        try {
+            systemTopicTableView.put(key, data).get(SYNC_TIMEOUT_IN_SECS, 
TimeUnit.SECONDS);
+        } catch (Throwable e) {
+            log.error("SystemTopicTableView failed to sync key:{}, data:{}", 
key, data, e);
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private void syncToMetadataStore(String key, ServiceUnitStateData data) {
+        try {
+            metadataStoreTableView.put(key, data).get(SYNC_TIMEOUT_IN_SECS, 
TimeUnit.SECONDS);
+        } catch (Throwable e) {
+            log.error("metadataStoreTableView failed to sync key:{}, data:{}", 
key, data, e);
+            throw new IllegalStateException(e);
+        }
+    }
+...
+}
+
+
+```
+
+### Configuration
+
+- Add a `loadManagerServiceUnitStateTableViewClassName` configuration to 
specify `ServiceUnitStateTableView` implementation class name.
+- Add a `loadBalancerServiceUnitTableViewSyncerEnabled` configuration to to 
enable ServiceUnitTableViewSyncer to sync metadata store and system topic 
ServiceUnitStateTableView during migration.
+
+## Backward & Forward Compatibility
+
+It will ba Backward & Forward compatible as 
`loadManagerServiceUnitStateTableViewClassName` will be 
`ServiceUnitStateTableViewImpl`(system topic implementation) by default.
+
+We will introduce `ServiceUnitStateTableViewSyncer` to sync system topic and 
metadata store table views when migrating to 
ServiceUnitStateMetadataStoreTableViewImpl from ServiceUnitStateTableViewImpl 
and vice versa. This syncer can be enabled/disabled by a dynamic config, 
`loadBalancerServiceUnitTableViewSyncerEnabled`. The admin could enable this 
syncer before migration and disable it after it is finished.
+
+## Alternatives
+
+## General Notes
+
+## Links
+
+* Mailing List discussion thread: 
https://lists.apache.org/thread/v7sod21r56hkt2cjxl9pp348r4jxo6o8
+* Mailing List voting thread: 
https://lists.apache.org/thread/j453xp0vty8zy2y0ljssjgyvwb47royc

Reply via email to