This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b87c0fb01bc [fix] [ml] fix discontinuous ledger deletion (#20898)
b87c0fb01bc is described below
commit b87c0fb01bc6bab195987775221f5985faf68399
Author: fengyubiao <[email protected]>
AuthorDate: Tue Aug 1 22:37:37 2023 +0800
[fix] [ml] fix discontinuous ledger deletion (#20898)
### Motivation
- The task `trim ledgers` runs in the thread
`BkMainThreadPool.choose(ledgerName)`
- The task `write entries to BK` runs in the thread
`BkMainThreadPool.choose(ledgerId)`
So the two tasks above may run concurrently/
The task `trim ledgers` work as the flow below:
- find the ledgers which are no longer to read, the result is `{Ledgers
before the slowest read}`.
- check if the `{Ledgers before the slowest read}` is out of retention
policy, the result is `{Ledgers to be deleted}`.
- if the create time of the ledger is lower than the earliest retention
time, mark it should be deleted
- if after deleting this ledger, the rest ledgers are still larger than
the retention size, mark it should be deleted
- delete the`{Ledgers to be deleted}`
**(Highlight)** There is a scenario that causes the task `trim ledgers` did
discontinuous ledger deletion, resulting consume messages discontinuous:
- context:
- ledgers: `[{id=1, size=100}, {id=2,size=100}]`
- retention size: 150
- no cursor there
- Check `ledger 1`, skip by retention check `(200 - 100) < 150`
- One in-flight writing is finished, the `calculateTotalSizeWrited()` would
return `300` now.
- Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2
should be deleted.
- Delete the `ledger 2`.
- Create a new consumer. It will receive messages from `[ledger-1,
ledegr-3]`, but the `ledger-2` will be skipped.
### Modifications
Once the retention constraint has been met, break the loop.
(cherry picked from commit 782e91fe327efe2c9c9107d6c679c2837d43935b)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 60 ++++++++++++++++------
.../pulsar/common/util/LazyLoadableValue.java | 42 +++++++++++++++
2 files changed, 86 insertions(+), 16 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 10f7948f553..14f4bfed871 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -138,6 +138,7 @@ import
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.LazyLoadableValue;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
@@ -2559,15 +2560,13 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
}
- private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
- return config.getRetentionTimeMillis() >= 0
- && clock.millis() - ledgerTimestamp >
config.getRetentionTimeMillis();
+ private boolean hasLedgerRetentionExpired(long retentionTimeMs, long
ledgerTimestamp) {
+ return retentionTimeMs >= 0 && clock.millis() - ledgerTimestamp >
retentionTimeMs;
}
- private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) {
+ private boolean isLedgerRetentionOverSizeQuota(long retentionSizeInMB,
long totalSizeOfML, long sizeToDelete) {
// Handle the -1 size limit as "infinite" size quota
- return config.getRetentionSizeInMB() >= 0
- && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >=
config.getRetentionSizeInMB() * MegaByte;
+ return retentionSizeInMB >= 0 && totalSizeOfML - sizeToDelete >=
retentionSizeInMB * MegaByte;
}
boolean isOffloadedNeedsDelete(OffloadContext offload,
Optional<OffloadPolicies> offloadPolicies) {
@@ -2626,6 +2625,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
long slowestReaderLedgerId = -1;
+ final LazyLoadableValue<Long> slowestNonDurationLedgerId =
+ new LazyLoadableValue(() ->
getTheSlowestNonDurationReadPosition().getLedgerId());
+ final long retentionSizeInMB = config.getRetentionSizeInMB();
+ final long retentionTimeMs = config.getRetentionTimeMillis();
+ final long totalSizeOfML = TOTAL_SIZE_UPDATER.get(this);
if (!cursors.hasDurableCursors()) {
// At this point the lastLedger will be pointing to the
// ledger that has just been closed, therefore the +1 to
@@ -2648,7 +2652,10 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
long totalSizeToDelete = 0;
// skip ledger if retention constraint met
- for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId,
false).values()) {
+ Iterator<LedgerInfo> ledgerInfoIterator =
+ ledgers.headMap(slowestReaderLedgerId,
false).values().iterator();
+ while (ledgerInfoIterator.hasNext()){
+ LedgerInfo ls = ledgerInfoIterator.next();
// currentLedger can not be deleted
if (ls.getLedgerId() == currentLedger.getId()) {
if (log.isDebugEnabled()) {
@@ -2668,8 +2675,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
totalSizeToDelete += ls.getSize();
- boolean overRetentionQuota =
isLedgerRetentionOverSizeQuota(totalSizeToDelete);
- boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
+ boolean overRetentionQuota =
isLedgerRetentionOverSizeQuota(retentionSizeInMB, totalSizeOfML,
+ totalSizeToDelete);
+ boolean expired = hasLedgerRetentionExpired(retentionTimeMs,
ls.getTimestamp());
if (log.isDebugEnabled()) {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- "
@@ -2686,14 +2694,19 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
ledgersToDelete.add(ls);
} else {
- if (ls.getLedgerId() <
getTheSlowestNonDurationReadPosition().getLedgerId()) {
- // once retention constraint has been met, skip check
- if (log.isDebugEnabled()) {
- log.debug("[{}] Ledger {} not deleted. Neither
expired nor over-quota", name,
- ls.getLedgerId());
- }
- invalidateReadHandle(ls.getLedgerId());
+ // once retention constraint has been met, skip check
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger {} not deleted. Neither expired
nor over-quota", name, ls.getLedgerId());
}
+ releaseReadHandleIfNoLongerRead(ls.getLedgerId(),
slowestNonDurationLedgerId.getValue());
+ break;
+ }
+ }
+
+ while (ledgerInfoIterator.hasNext()) {
+ LedgerInfo ls = ledgerInfoIterator.next();
+ if (!releaseReadHandleIfNoLongerRead(ls.getLedgerId(),
slowestNonDurationLedgerId.getValue())) {
+ break;
}
}
@@ -2779,6 +2792,21 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
}
}
+ /**
+ * @param ledgerId the ledger handle which maybe will be released.
+ * @return if the ledger handle was released.
+ */
+ private boolean releaseReadHandleIfNoLongerRead(long ledgerId, long
slowestNonDurationLedgerId) {
+ if (ledgerId < slowestNonDurationLedgerId) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger {} no longer needs to be read, close
the cached readHandle", name, ledgerId);
+ }
+ invalidateReadHandle(ledgerId);
+ return true;
+ }
+ return false;
+ }
+
protected void doDeleteLedgers(List<LedgerInfo> ledgersToDelete) {
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java
new file mode 100644
index 00000000000..063d434a64f
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.util.function.Supplier;
+
+/***
+ * Used to lazy load a value, only calculate it when used. Not thread-safety.
+ */
+public class LazyLoadableValue<T> {
+
+ private Supplier<T> loader;
+
+ private T value;
+
+ public LazyLoadableValue(Supplier<T> loader) {
+ this.loader = loader;
+ }
+
+ public T getValue() {
+ if (value == null) {
+ value = loader.get();
+ }
+ return value;
+ }
+}