lhotari commented on code in PR #23901:
URL: https://github.com/apache/pulsar/pull/23901#discussion_r1933523228
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java:
##########
@@ -232,8 +232,66 @@ public PendingRead(PendingReadKey key,
this.ledgerCache = ledgerCache;
}
- private List<EntryImpl> keepEntries(List<EntryImpl> list, long
startEntry, long endEntry) {
- List<EntryImpl> result = new ArrayList<>((int) (endEntry -
startEntry));
+ public void attach(CompletableFuture<List<EntryImpl>> handle) {
+ handle.whenComplete((entriesToReturn, error) -> {
+ // execute in the completing thread
+ completeAndRemoveFromCache();
+ // execute the callbacks in the managed ledger executor
+ rangeEntryCache.getManagedLedger().getExecutor().execute(() ->
{
+ if (error != null) {
+ readEntriesFailed(error);
+ } else {
+ readEntriesComplete(entriesToReturn);
+ }
+ });
+ });
+ }
+
+ private synchronized void completeAndRemoveFromCache() {
+ completed = true;
+ // When the read has completed, remove the instance from the
ledgerCache map
+ // so that new reads will go to a new instance.
+ // this is required because we are going to do refcount management
+ // on the results of the callback
+ synchronized (ledgerCache) {
Review Comment:
Yes, it seems it could. I'll remove the `synchronized`. I checked the code
and there's no need to synchronize when removing from ledgerCache since it's a
ConcurrentHashMap. There's currently synchronization for adding new entries
which makes sense since it's needed for the deduplication logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]