This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b1225fecd8a TableView should cache created readers (#15178)
b1225fecd8a is described below
commit b1225fecd8a04106667fe09e98960829e39376af
Author: Neng Lu <[email protected]>
AuthorDate: Tue Apr 19 22:18:58 2022 -0700
TableView should cache created readers (#15178)
---
.../java/org/apache/pulsar/client/impl/TableViewImpl.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 4a7f8d5469f..6638d416886 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -224,9 +224,23 @@ public class TableViewImpl<T> implements TableView<T> {
.readCompacted(true)
.poolMessages(true)
.createAsync()
+ .thenCompose(this::cacheNewReader)
.thenCompose(this::readAllExistingMessages);
}
+ private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) {
+ CompletableFuture<Reader<T>> future = new CompletableFuture<>();
+ if (this.readers.containsKey(reader.getTopic())) {
+ future.completeExceptionally(
+ new IllegalArgumentException("reader on partition " +
reader.getTopic() + " already existed"));
+ } else {
+ this.readers.put(reader.getTopic(), reader);
+ future.complete(reader);
+ }
+
+ return future;
+ }
+
private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T>
reader) {
long startTime = System.nanoTime();
AtomicLong messagesRead = new AtomicLong();