lhotari commented on code in PR #19783:
URL: https://github.com/apache/pulsar/pull/19783#discussion_r1841861570


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2615,7 +2627,63 @@ private Optional<OffloadPolicies> 
getOffloadPoliciesIfAppendable() {
         return Optional.ofNullable(ledgerOffloader.getOffloadPolicies());
     }
 
+    @VisibleForTesting
+    synchronized List<Long> internalEvictOffloadedLedgers() {
+        int inactiveOffloadedLedgerEvictionTimeMs = 
config.getInactiveOffloadedLedgerEvictionTimeMs();
+        if (inactiveOffloadedLedgerEvictionTimeMs <= 0) {
+            return Collections.emptyList();
+        }
+
+        // allow only one eviction run at a time
+        if (evictOffloadedLedgersInProgress) {
+            return Collections.emptyList();
+        }
+
+        long now = clock.millis();
+        int minimumEvictionIntervalMs = inactiveOffloadedLedgerEvictionTimeMs 
/ MINIMUM_EVICTION_INTERVAL_DIVIDER;
+        if (now - lastEvictOffloadedLedgers < minimumEvictionIntervalMs) {
+            // skip eviction if we have done it recently
+            return Collections.emptyList();
+        }
+
+        try {
+            evictOffloadedLedgersInProgress = true;
+            List<Long> ledgersToRelease = new ArrayList<>();
+
+            ledgerCache.forEach((ledgerId, ledger) -> {
+                if (ledger.isDone() && !ledger.isCompletedExceptionally()) {
+                    ReadHandle readHandle = ledger.join();
+                    if (readHandle instanceof OffloadedLedgerHandle) {
+                        long lastAccessTimestamp = ((OffloadedLedgerHandle) 
readHandle).lastAccessTimestamp();
+                        if (lastAccessTimestamp >= 0) {
+                            long delta = now - lastAccessTimestamp;
+                            if (delta >= 
inactiveOffloadedLedgerEvictionTimeMs) {
+                                log.info("[{}] Offloaded ledger {} can be 
released ({} ms elapsed since last access)",
+                                        name, ledgerId, delta);
+                                ledgersToRelease.add(ledgerId);
+                            } else if (log.isDebugEnabled()) {
+                                log.debug(
+                                        "[{}] Offloaded ledger {} cannot be 
released ({} ms elapsed since last access)",
+                                        name, ledgerId, delta);
+                            }
+                        }
+                    }
+                }
+            });
+            for (Long ledgerId : ledgersToRelease) {
+                invalidateReadHandle(ledgerId);
+            }
+            return ledgersToRelease;
+        } finally {
+            lastEvictOffloadedLedgers = now;
+            evictOffloadedLedgersInProgress = false;
+        }
+    }
+
     void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) 
{
+
+        internalEvictOffloadedLedgers();

Review Comment:
   It gets called when `internalTrimLedgers` is called. 



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2615,7 +2627,63 @@ private Optional<OffloadPolicies> 
getOffloadPoliciesIfAppendable() {
         return Optional.ofNullable(ledgerOffloader.getOffloadPolicies());
     }
 
+    @VisibleForTesting
+    synchronized List<Long> internalEvictOffloadedLedgers() {
+        int inactiveOffloadedLedgerEvictionTimeMs = 
config.getInactiveOffloadedLedgerEvictionTimeMs();
+        if (inactiveOffloadedLedgerEvictionTimeMs <= 0) {
+            return Collections.emptyList();
+        }
+
+        // allow only one eviction run at a time
+        if (evictOffloadedLedgersInProgress) {
+            return Collections.emptyList();
+        }
+
+        long now = clock.millis();
+        int minimumEvictionIntervalMs = inactiveOffloadedLedgerEvictionTimeMs 
/ MINIMUM_EVICTION_INTERVAL_DIVIDER;
+        if (now - lastEvictOffloadedLedgers < minimumEvictionIntervalMs) {
+            // skip eviction if we have done it recently
+            return Collections.emptyList();
+        }
+
+        try {
+            evictOffloadedLedgersInProgress = true;
+            List<Long> ledgersToRelease = new ArrayList<>();
+
+            ledgerCache.forEach((ledgerId, ledger) -> {
+                if (ledger.isDone() && !ledger.isCompletedExceptionally()) {
+                    ReadHandle readHandle = ledger.join();
+                    if (readHandle instanceof OffloadedLedgerHandle) {
+                        long lastAccessTimestamp = ((OffloadedLedgerHandle) 
readHandle).lastAccessTimestamp();
+                        if (lastAccessTimestamp >= 0) {
+                            long delta = now - lastAccessTimestamp;
+                            if (delta >= 
inactiveOffloadedLedgerEvictionTimeMs) {
+                                log.info("[{}] Offloaded ledger {} can be 
released ({} ms elapsed since last access)",
+                                        name, ledgerId, delta);
+                                ledgersToRelease.add(ledgerId);
+                            } else if (log.isDebugEnabled()) {
+                                log.debug(
+                                        "[{}] Offloaded ledger {} cannot be 
released ({} ms elapsed since last access)",
+                                        name, ledgerId, delta);
+                            }
+                        }
+                    }
+                }
+            });
+            for (Long ledgerId : ledgersToRelease) {
+                invalidateReadHandle(ledgerId);
+            }
+            return ledgersToRelease;
+        } finally {
+            lastEvictOffloadedLedgers = now;
+            evictOffloadedLedgersInProgress = false;
+        }
+    }
+
     void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) 
{
+
+        internalEvictOffloadedLedgers();

Review Comment:
   @poorbarcode It gets called when `internalTrimLedgers` is called. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to