This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b87c0fb01bc [fix] [ml] fix discontinuous ledger deletion (#20898)
b87c0fb01bc is described below

commit b87c0fb01bc6bab195987775221f5985faf68399
Author: fengyubiao <[email protected]>
AuthorDate: Tue Aug 1 22:37:37 2023 +0800

    [fix] [ml] fix discontinuous ledger deletion (#20898)
    
    ### Motivation
    
    - The task `trim ledgers` runs in the thread 
`BkMainThreadPool.choose(ledgerName)`
    - The task `write entries to BK` runs in the thread 
`BkMainThreadPool.choose(ledgerId)`
    
    So the two tasks above may run concurrently/
    
    The task `trim ledgers` work as the flow below:
    - find the ledgers which are no longer to read, the result is `{Ledgers 
before the slowest read}`.
    - check if the `{Ledgers before the slowest read}` is out of retention 
policy, the result is `{Ledgers to be deleted}`.
      - if the create time of the ledger is lower than the earliest retention 
time, mark it should be deleted
      - if after deleting this ledger, the rest ledgers are still larger than 
the retention size, mark it should be deleted
    - delete the`{Ledgers to be deleted}`
    
    **(Highlight)** There is a scenario that causes the task `trim ledgers` did 
 discontinuous ledger deletion, resulting consume messages discontinuous:
    - context:
      - ledgers: `[{id=1, size=100}, {id=2,size=100}]`
      - retention size: 150
      - no cursor there
    - Check `ledger 1`, skip by retention check `(200 - 100) < 150`
    - One in-flight writing is finished, the `calculateTotalSizeWrited()` would 
return `300` now.
    - Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 
should be deleted.
    - Delete the `ledger 2`.
    - Create a new consumer. It will receive messages from `[ledger-1, 
ledegr-3]`, but the `ledger-2` will be skipped.
    
    ### Modifications
    
    Once the retention constraint has been met, break the loop.
    
    (cherry picked from commit 782e91fe327efe2c9c9107d6c679c2837d43935b)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 60 ++++++++++++++++------
 .../pulsar/common/util/LazyLoadableValue.java      | 42 +++++++++++++++
 2 files changed, 86 insertions(+), 16 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 10f7948f553..14f4bfed871 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -138,6 +138,7 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.LazyLoadableValue;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.metadata.api.Stat;
 import org.slf4j.Logger;
@@ -2559,15 +2560,13 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         }
     }
 
-    private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
-        return config.getRetentionTimeMillis() >= 0
-                && clock.millis() - ledgerTimestamp > 
config.getRetentionTimeMillis();
+    private boolean hasLedgerRetentionExpired(long retentionTimeMs, long 
ledgerTimestamp) {
+        return retentionTimeMs >= 0 && clock.millis() - ledgerTimestamp > 
retentionTimeMs;
     }
 
-    private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) {
+    private boolean isLedgerRetentionOverSizeQuota(long retentionSizeInMB, 
long totalSizeOfML, long sizeToDelete) {
         // Handle the -1 size limit as "infinite" size quota
-        return config.getRetentionSizeInMB() >= 0
-                && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= 
config.getRetentionSizeInMB() * MegaByte;
+        return retentionSizeInMB >= 0 && totalSizeOfML - sizeToDelete >= 
retentionSizeInMB * MegaByte;
     }
 
     boolean isOffloadedNeedsDelete(OffloadContext offload, 
Optional<OffloadPolicies> offloadPolicies) {
@@ -2626,6 +2625,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             }
 
             long slowestReaderLedgerId = -1;
+            final LazyLoadableValue<Long> slowestNonDurationLedgerId =
+                    new LazyLoadableValue(() -> 
getTheSlowestNonDurationReadPosition().getLedgerId());
+            final long retentionSizeInMB = config.getRetentionSizeInMB();
+            final long retentionTimeMs = config.getRetentionTimeMillis();
+            final long totalSizeOfML = TOTAL_SIZE_UPDATER.get(this);
             if (!cursors.hasDurableCursors()) {
                 // At this point the lastLedger will be pointing to the
                 // ledger that has just been closed, therefore the +1 to
@@ -2648,7 +2652,10 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
             long totalSizeToDelete = 0;
             // skip ledger if retention constraint met
-            for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, 
false).values()) {
+            Iterator<LedgerInfo> ledgerInfoIterator =
+                    ledgers.headMap(slowestReaderLedgerId, 
false).values().iterator();
+            while (ledgerInfoIterator.hasNext()){
+                LedgerInfo ls = ledgerInfoIterator.next();
                 // currentLedger can not be deleted
                 if (ls.getLedgerId() == currentLedger.getId()) {
                     if (log.isDebugEnabled()) {
@@ -2668,8 +2675,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 }
 
                 totalSizeToDelete += ls.getSize();
-                boolean overRetentionQuota = 
isLedgerRetentionOverSizeQuota(totalSizeToDelete);
-                boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
+                boolean overRetentionQuota = 
isLedgerRetentionOverSizeQuota(retentionSizeInMB, totalSizeOfML,
+                        totalSizeToDelete);
+                boolean expired = hasLedgerRetentionExpired(retentionTimeMs, 
ls.getTimestamp());
                 if (log.isDebugEnabled()) {
                     log.debug(
                             "[{}] Checking ledger {} -- time-old: {} sec -- "
@@ -2686,14 +2694,19 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                     }
                     ledgersToDelete.add(ls);
                 } else {
-                    if (ls.getLedgerId() < 
getTheSlowestNonDurationReadPosition().getLedgerId()) {
-                        // once retention constraint has been met, skip check
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] Ledger {} not deleted. Neither 
expired nor over-quota", name,
-                                    ls.getLedgerId());
-                        }
-                        invalidateReadHandle(ls.getLedgerId());
+                    // once retention constraint has been met, skip check
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Ledger {} not deleted. Neither expired 
nor over-quota", name, ls.getLedgerId());
                     }
+                    releaseReadHandleIfNoLongerRead(ls.getLedgerId(), 
slowestNonDurationLedgerId.getValue());
+                    break;
+                }
+            }
+
+            while (ledgerInfoIterator.hasNext()) {
+                LedgerInfo ls = ledgerInfoIterator.next();
+                if (!releaseReadHandleIfNoLongerRead(ls.getLedgerId(), 
slowestNonDurationLedgerId.getValue())) {
+                    break;
                 }
             }
 
@@ -2779,6 +2792,21 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    /**
+     * @param ledgerId the ledger handle which maybe will be released.
+     * @return if the ledger handle was released.
+     */
+    private boolean releaseReadHandleIfNoLongerRead(long ledgerId, long 
slowestNonDurationLedgerId) {
+        if (ledgerId < slowestNonDurationLedgerId) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Ledger {} no longer needs to be read, close 
the cached readHandle", name, ledgerId);
+            }
+            invalidateReadHandle(ledgerId);
+            return true;
+        }
+        return false;
+    }
+
     protected void doDeleteLedgers(List<LedgerInfo> ledgersToDelete) {
         PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
         // Update metadata
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java
new file mode 100644
index 00000000000..063d434a64f
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.util.function.Supplier;
+
+/***
+ * Used to lazy load a value, only calculate it when used. Not thread-safety.
+ */
+public class LazyLoadableValue<T> {
+
+    private Supplier<T> loader;
+
+    private T value;
+
+    public LazyLoadableValue(Supplier<T> loader) {
+        this.loader = loader;
+    }
+
+    public T getValue() {
+        if (value == null) {
+            value = loader.get();
+        }
+        return value;
+    }
+}

Reply via email to