codelipenghui commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1372536188
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java:
##########
@@ -110,4 +110,37 @@ public interface TableView<T> extends Closeable {
* @return a future that can used to track when the table view has been
closed.
*/
CompletableFuture<Void> closeAsync();
+
+
+ /**
+ *
Review Comment:
Remove this line.
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java:
##########
@@ -110,4 +110,37 @@ public interface TableView<T> extends Closeable {
* @return a future that can used to track when the table view has been
closed.
*/
CompletableFuture<Void> closeAsync();
+
+
Review Comment:
Remove the extra empty line.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -230,25 +230,35 @@ private void handleMessage(Message<T> msg) {
}
}
- private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T>
reader) {
+ @Override
+ public CompletableFuture<Void> refreshAsync() {
+ return reader.thenCompose(reader -> readAllExistingMessages(reader,
false));
+ }
+
+ @Override
+ public void refresh() throws PulsarClientException {
+ refreshAsync().join();
+ }
+
+ private CompletableFuture<Void> readAllExistingMessages(Reader<T> reader,
boolean readTailMessages) {
long startTime = System.nanoTime();
AtomicLong messagesRead = new AtomicLong();
- CompletableFuture<Reader<T>> future = new CompletableFuture<>();
- readAllExistingMessages(reader, future, startTime, messagesRead);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ readAllExistingMessages(reader, future, startTime, messagesRead,
readTailMessages);
return future;
}
- private void readAllExistingMessages(Reader<T> reader,
CompletableFuture<Reader<T>> future, long startTime,
- AtomicLong messagesRead) {
+ private void readAllExistingMessages(Reader<T> reader,
CompletableFuture<Void> future, long startTime,
+ AtomicLong messagesRead, boolean
readTailMessages) {
reader.hasMessageAvailableAsync()
.thenAccept(hasMessage -> {
if (hasMessage) {
reader.readNextAsync()
.thenAccept(msg -> {
messagesRead.incrementAndGet();
handleMessage(msg);
- readAllExistingMessages(reader, future,
startTime, messagesRead);
+ readAllExistingMessages(reader, future,
startTime, messagesRead, readTailMessages);
Review Comment:
If I understand correctly, this PR will depend on
https://github.com/apache/pulsar/pull/21270, right? Otherwise, the
implementation will not conform to [what the API
said](https://github.com/apache/pulsar/pull/21417/files#diff-4ec98b75d3bcad44ec9b4ee0a8853435c9f20e37880a917a544797a8942f4e41R123-R124).
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -232,33 +232,33 @@ private void handleMessage(Message<T> msg) {
@Override
public CompletableFuture<Void> refreshAsync() {
- return reader.thenCompose(this::readAllExistingMessages);
+ return reader.thenCompose(reader -> readAllExistingMessages(reader,
false));
Review Comment:
Maybe we should consider having a callback queue for the refresh API. For
example:
```
| LastMessageIds -> Future | LastMessageIds -> Future | LastMessageIds ->
Future | LastMessageIds -> Future |
```
For the refresh request, we can try to get the last message IDs from the
broker and then add it along with the Future to the queue.
```
| [{topic1-> (1,2), topic2 -> {3,2}}] -> Future |
```
Then returns the future to the caller.
The tailing read will check the header of the queue and try to complete the
future if all the last message IDs are read.
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -129,6 +129,23 @@ private Set<String> publishMessages(String topic, int
count, boolean enableBatch
return keys;
}
+ @Test
+ public void testRefreshAPI() throws Exception {
+ String topic = "persistent://public/default/testRefreshAPI";
+ admin.topics().createPartitionedTopic(topic, 3);
+ int count = 50;
+ Set<String> keys = this.publishMessages(topic, count, false);
+ @Cleanup
+ TableView<byte[]> tv = pulsarClient.newTableViewBuilder(Schema.BYTES)
+ .topic(topic)
+ .create();
+ // Call the refresh API to update the tableview. After this operation,
+ // there is no need to use 'Awaitility.await()' when assert the size
and keys of 'tv'.
Review Comment:
You should publish more messages since creating the table view will also
load all the data from the server side before it is available for users.
We should also consider stopping or slowing the `readTailMessages` method to
make sure the refreshed data is contributed by the refresh API, not the
`readTailMessages` method. Otherwise, you don't if this test can protect the
newly added refresh API to avoid break changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]