merlimat opened a new issue #12356:
URL: https://github.com/apache/pulsar/issues/12356


   ## Motivation
   
   In many use cases, applications are using Pulsar consumers or readers to 
fetch
   all the updates from a topic and construct a map with the latest value of 
each
   key for the messages that were received. This is very common when 
constructing
   a local cache of the data.
   
   We want to offer support for this access pattern directly in the Pulsar 
client
   API, as a way to encapsulate the complexities of setting this up.
   
   
   ## Goal
   
   Provide a view of the topic data in the form of a read-only map that is
   constantly updated with the latest version of each key.
   
   Additionally, let the application specify a listener so that it can perform
   a scan of the map and then receive notifications when new messages are
   received and applied.
   
   ## API Changes
   
   This proposal will only add new API on the client side.
   
   A new type of consumer will be added, the `TableView`.
   
   Example:
   
   ```java
   TableView<Integer> tableView = pulsarClient.newTableView(Schema.INT32)
       .topic(topic)
       .create();
   
   tableView.get("my-key"); // --> 5
   tableView.get("my-other-key"); // --> 7
   ```
   
   When a `TableView` instance is created, it will be guaranteed to already have
   the latest value for each key, for the current time.
   
   ### API additions
   
   ```java
   interface PulsarClient {
       // ....
       <T> TableViewBuilder<T> newTableView(Schema<T> schema);
   }
   
   interface TableViewBuilder<T> {
       TableViewBuilder<T> loadConf(Map<String, Object> config);
       TableView<T> create() throws PulsarClientException;
       CompletableFuture<TableView<T>> createAsync();
       TableViewBuilder<T> topic(String topic);
       TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit 
unit);
   }
   
   interface TableView<T> extends Closeable {
   
       // Similar methods as java.util.Map
       int size();
       boolean isEmpty();
       boolean containsKey(String key);
       T get(String key);
       Set<Map.Entry<String, T>> entrySet();
       Set<String> keySet();
       Collection<T> values();
       void forEach(BiConsumer<String, T> action);
   
       /**
        * Performs the given action for each entry in this map until all entries
        * have been processed or the action throws an exception.
        *
        * When all the entries have been processed, the action will be invoked
        * for every new update that is received from the topic.
        *
        * @param action The action to be performed for each entry
        */
       void forEachAndListen(BiConsumer<String, T> action);
   
       /**
        * Close the table view and releases resources allocated.
        *
        * @return a future that can used to track when the table view has been 
closed
        */
       CompletableFuture<Void> closeAsync();
   }
   ```
   
   ## Implementation
   
   The `TableView` will be implemented using multiple `Reader` instances, one
   per each partition and will always specify to read starting from the 
compacted
   view of the topic.
   
   The creation time of a table view can be controlled by configuring the
   topic compaction policies for the given topic or namespace. More frequent
   compaction can lead to very short startup times, as in less data will be
   replayed to reconstruct the `TableView` of the topic.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to