This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bb52721b88cbf3f8772481076c6f9047f65ca9b8
Author: Neng Lu <[email protected]>
AuthorDate: Tue Apr 19 22:18:58 2022 -0700

    TableView should cache created readers (#15178)
    
    (cherry picked from commit b1225fecd8a04106667fe09e98960829e39376af)
---
 .../java/org/apache/pulsar/client/impl/TableViewImpl.java  | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index dc32bd008a5..483b2c1ee63 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -224,9 +224,23 @@ public class TableViewImpl<T> implements TableView<T> {
                 .readCompacted(true)
                 .poolMessages(true)
                 .createAsync()
+                .thenCompose(this::cacheNewReader)
                 .thenCompose(this::readAllExistingMessages);
     }
 
+    private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) {
+        CompletableFuture<Reader<T>> future = new CompletableFuture<>();
+        if (this.readers.containsKey(reader.getTopic())) {
+            future.completeExceptionally(
+                    new IllegalArgumentException("reader on partition " + 
reader.getTopic() + " already existed"));
+        } else {
+            this.readers.put(reader.getTopic(), reader);
+            future.complete(reader);
+        }
+
+        return future;
+    }
+
     private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> 
reader) {
         long startTime = System.nanoTime();
         AtomicLong messagesRead = new AtomicLong();

Reply via email to