Denovo1998 commented on code in PR #25293:
URL: https://github.com/apache/pulsar/pull/25293#discussion_r2901387078


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java:
##########
@@ -48,7 +49,20 @@ public class TableView<T> {
     protected final Function<TopicName, CompletableFuture<Reader<T>>> 
readerCreator;
     private final Map<String, T> snapshots = new ConcurrentHashMap<>();
     private final long clientOperationTimeoutMs;
-    private final SimpleCache<NamespaceName, Reader<T>> readers;
+    private final SimpleCache<NamespaceName, CompletableFuture<Reader<T>>> 
readers;
+    private final Consumer<CompletableFuture<Reader<T>>> expiration = 
readerFuture -> {
+        if (!readerFuture.isDone()) {
+            readerFuture.handle((reader, t) -> {
+                if (reader != null) {
+                    closeReader(reader);
+                }
+                return null;
+            });
+        } else if (!readerFuture.isCompletedExceptionally()) {
+            // Since the future has done, it will not wait anymore.
+            closeReader(readerFuture.join());

Review Comment:
   This shortened the lock duration but appears to introduce a new implicit 
constraint: clientOperationTimeoutMs must be less than CACHE_EXPIRE_TIMEOUT_MS. 
Otherwise, while getReader() awaits the future, the cache entry could expire 
and register a "close immediately upon completion" action, causing the reader 
retrieved by the caller to already be in the process of closing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to