liangyepianzhou commented on code in PR #21271: URL: https://github.com/apache/pulsar/pull/21271#discussion_r1349651345
########## pip/pip-302.md: ########## @@ -0,0 +1,104 @@ +# Background Knowledge + +The TableView interface provides a convenient way to access data in a compacted topic by offering a continuously updated key-value map view. It operates on messages with keys and ignores those without keys. + +By utilizing the TableView, Pulsar clients can retrieve all message updates from a specific topic and create a map that contains the latest values for each key. This map can be used to establish a local cache of data. Additionally, clients can register consumers with the TableView and specify a listener to scan the map and receive notifications whenever new messages are received. This functionality enables event-driven applications and message monitoring. + +For more detailed information about the TableView, please refer to the [Pulsar documentation](https://pulsar.apache.org/docs/next/concepts-clients/#tableview). + +# Motivation + +In the context of utilizing the TableView component, there are instances where we aspire to consistently retrieve the most up-to-date value associated with a given key. +When read and write operations for a certain key do not occur simultaneously, we can refresh the TableView before reading the key to obtain the latest value for this key. +To accomplish this, we can employ an API `refreshAsync` that allows us to refresh the TableView before accessing the value corresponding to the desired key. +# Goals + +## In Scope + +The proposed changes aim to add a new API that triggers the reading of all existing messages that before the trigger time, +updates the TableView, and waits for the read operation to complete. This will be achieved by introducing the `refreshAsync()` method. + +## Out of Scope + +No out-of-scope items have been identified at this time. + +# High-Level Design + +The high-level design involves the addition of a new API method called `refreshAsync()`. +This method will be responsible for initiating the process of reading all existing messages that before the trigger time, +updating the TableView, and returning a future to wait for the read operation to complete. + +# Detailed Design + +## Design & Implementation Details + +The `refreshAsync()` method will be implemented as follows: + +```java +@Override +public CompletableFuture<Void> refreshAsync() { + return reader.thenCompose(this::readAllExistingMessages); +} +``` + +# Public-Facing Changes + +## Public API + +The following changes will be made to the public API: + +### `refreshAsync()` + +This new API method triggers the reading of all existing messages, updates the TableView, and waits for the read operation to complete. +```java +/** + * Triggers the reading of all existing messages from the topics, updates the TableView and waits for the read operation to complete. + * This method fetches the last message of the topics at the point of invocation and updates the TableView with all messages up to and including this last message. + * After the update is complete, users can use the TableView to obtain the latest value for any key. + * Note that the 'latest' value refers to the value at the point of calling refresh, not necessarily the current latest if more messages have been produced in the meantime. + * + * @return a CompletableFuture that completes when all existing messages up to the point of invocation have been read, and the TableView has been updated. + * + * Example usage: + * table.refreshAsync().whenComplete((__, ex) -> { + * if (ex == null) { + * table.get(key); + * } + * }); Review Comment: Okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
