This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bb52721b88cbf3f8772481076c6f9047f65ca9b8 Author: Neng Lu <[email protected]> AuthorDate: Tue Apr 19 22:18:58 2022 -0700 TableView should cache created readers (#15178) (cherry picked from commit b1225fecd8a04106667fe09e98960829e39376af) --- .../java/org/apache/pulsar/client/impl/TableViewImpl.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index dc32bd008a5..483b2c1ee63 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -224,9 +224,23 @@ public class TableViewImpl<T> implements TableView<T> { .readCompacted(true) .poolMessages(true) .createAsync() + .thenCompose(this::cacheNewReader) .thenCompose(this::readAllExistingMessages); } + private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) { + CompletableFuture<Reader<T>> future = new CompletableFuture<>(); + if (this.readers.containsKey(reader.getTopic())) { + future.completeExceptionally( + new IllegalArgumentException("reader on partition " + reader.getTopic() + " already existed")); + } else { + this.readers.put(reader.getTopic(), reader); + future.complete(reader); + } + + return future; + } + private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) { long startTime = System.nanoTime(); AtomicLong messagesRead = new AtomicLong();
