This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 80f30e40012 [improve][broker]Improve PersistentMessageExpiryMonitor
expire speed when ledger not existed (#17842)
80f30e40012 is described below
commit 80f30e40012dbdd8a7d7af988a5dff4011579841
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Mon Oct 3 23:13:03 2022 +0800
[improve][broker]Improve PersistentMessageExpiryMonitor expire speed when
ledger not existed (#17842)
(cherry picked from commit af11c32611e41d708e76264d47ef9501534c6c5c)
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 7 ++++++
.../bookkeeper/mledger/ManagedLedgerException.java | 6 +++++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 25 +++++++++++++++++++--
.../persistent/PersistentMessageExpiryMonitor.java | 26 ++++++++++++++++++++--
.../offload/jcloud/impl/MockManagedLedger.java | 7 ++++++
5 files changed, 67 insertions(+), 4 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index cd39919a3b3..083ccd939b1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
import io.netty.buffer.ByteBuf;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
@@ -631,6 +632,12 @@ public interface ManagedLedger {
*/
CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
+ /**
+ * Get basic ledger summary.
+ * will get {@link Optional#empty()} if corresponding ledger not exists.
+ */
+ Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId);
+
/**
* Truncate ledgers
* The truncate operation will move all cursors to the end of the topic
and delete all inactive ledgers.
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 0dc820ec46d..fa6dc59d147 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -147,6 +147,12 @@ public class ManagedLedgerException extends Exception {
}
}
+ public static class LedgerNotExistException extends
NonRecoverableLedgerException {
+ public LedgerNotExistException(String msg) {
+ super(msg);
+ }
+ }
+
public static class InvalidReplayPositionException extends
ManagedLedgerException {
public InvalidReplayPositionException(String msg) {
super(msg);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d35d5cf6c5c..364a7247b97 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -99,6 +99,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerInterceptException;
@@ -1748,6 +1749,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return result;
}
+ @Override
+ public Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId) {
+ return Optional.ofNullable(ledgers.get(ledgerId));
+ }
+
CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
if (ledgerHandle != null) {
@@ -1853,7 +1859,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
} else {
log.error("[{}] Failed to get message with ledger {}:{} the
ledgerId does not belong to this topic "
+ "or has been deleted.", name, position.getLedgerId(),
position.getEntryId());
- callback.readEntryFailed(new
ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+ callback.readEntryFailed(new LedgerNotExistException("Message not
found, "
+ "the ledgerId does not belong to this topic or has been
deleted"), ctx);
}
@@ -3656,11 +3662,26 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
}
+ private static boolean isLedgerNotExistException(int rc) {
+ switch (rc) {
+ case Code.NoSuchLedgerExistsException:
+ case Code.NoSuchLedgerExistsOnMetadataServerException:
+ return true;
+
+ default:
+ return false;
+ }
+ }
+
public static ManagedLedgerException createManagedLedgerException(int
bkErrorCode) {
if (bkErrorCode == BKException.Code.TooManyRequestsException) {
return new TooManyRequestsException("Too many request error from
bookies");
} else if (isBkErrorNotRecoverable(bkErrorCode)) {
- return new
NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
+ if (isLedgerNotExistException(bkErrorCode)) {
+ return new
LedgerNotExistException(BKException.getMessage(bkErrorCode));
+ } else {
+ return new
NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
+ }
} else {
return new
ManagedLedgerException(BKException.getMessage(bkErrorCode));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 1ffaecd9f74..32cff81a64f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -26,8 +26,10 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -190,8 +192,28 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback {
&& (exception instanceof NonRecoverableLedgerException)) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}",
topicName, subName, failedReadPosition,
exception.getMessage());
- findEntryComplete(failedReadPosition.get(), ctx);
- return;
+ if (exception instanceof LedgerNotExistException) {
+ long failedLedgerId = failedReadPosition.get().getLedgerId();
+ ManagedLedgerImpl ledger = ((ManagedLedgerImpl)
cursor.getManagedLedger());
+ Position lastPositionInLedger =
ledger.getOptionalLedgerInfo(failedLedgerId)
+ .map(ledgerInfo -> PositionImpl.get(failedLedgerId,
ledgerInfo.getEntries() - 1))
+ .orElseGet(() -> {
+ Long nextExistingLedger =
ledger.getNextValidLedger(failedReadPosition.get().getLedgerId());
+ if (nextExistingLedger == null) {
+ log.info("[{}] [{}] Couldn't find next next
valid ledger for expiry monitor when find "
+ + "entry failed {}",
ledger.getName(), ledger.getName(),
+ failedReadPosition);
+ return (PositionImpl) failedReadPosition.get();
+ } else {
+ return PositionImpl.get(nextExistingLedger,
-1);
+ }
+ });
+ log.info("[{}][{}] ledger not existed, will complete the last
position of the non-existed"
+ + " ledger:{}", topicName, subName,
lastPositionInLedger);
+ findEntryComplete(lastPositionInLedger, ctx);
+ } else {
+ findEntryComplete(failedReadPosition.get(), ctx);
+ }
}
expirationCheckInProgress = FALSE;
updateRates();
diff --git
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 907aba67b9e..d86a749b7a6 100644
---
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import com.google.common.base.Predicate;
import io.netty.buffer.ByteBuf;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -347,6 +348,12 @@ public class MockManagedLedger implements ManagedLedger {
return CompletableFuture.completedFuture(build);
}
+ @Override
+ public Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId) {
+ final LedgerInfo build =
LedgerInfo.newBuilder().setLedgerId(ledgerId).setSize(100).setEntries(20).build();
+ return Optional.of(build);
+ }
+
@Override
public CompletableFuture<Void> asyncTruncate() {
return CompletableFuture.completedFuture(null);