This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 67df31b4516 [improve][broker]Improve PersistentMessageExpiryMonitor 
expire speed when ledger not existed (#17842)
67df31b4516 is described below

commit 67df31b45164fa694c78f96f058cd53b30ff9b9a
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Mon Oct 3 23:13:03 2022 +0800

    [improve][broker]Improve PersistentMessageExpiryMonitor expire speed when 
ledger not existed (#17842)
    
    (cherry picked from commit af11c32611e41d708e76264d47ef9501534c6c5c)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  7 ++++++
 .../bookkeeper/mledger/ManagedLedgerException.java |  6 ++++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 25 ++++++++++++++++++++--
 .../persistent/PersistentMessageExpiryMonitor.java | 25 +++++++++++++++++++++-
 .../offload/jcloud/impl/MockManagedLedger.java     |  7 ++++++
 5 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index cd39919a3b3..083ccd939b1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.mledger;
 
 import io.netty.buffer.ByteBuf;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
@@ -631,6 +632,12 @@ public interface ManagedLedger {
      */
     CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId);
 
+    /**
+     * Get basic ledger summary.
+     * will get {@link Optional#empty()} if corresponding ledger not exists.
+     */
+    Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId);
+
     /**
      * Truncate ledgers
      * The truncate operation will move all cursors to the end of the topic 
and delete all inactive ledgers.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 0dc820ec46d..fa6dc59d147 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -147,6 +147,12 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class LedgerNotExistException extends 
NonRecoverableLedgerException {
+        public LedgerNotExistException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class InvalidReplayPositionException extends 
ManagedLedgerException {
         public InvalidReplayPositionException(String msg) {
             super(msg);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 752f2fb960b..324ef67e16d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -100,6 +100,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.CursorNotFoundException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
+import 
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerInterceptException;
@@ -1752,6 +1753,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return result;
     }
 
+    @Override
+    public Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId) {
+        return Optional.ofNullable(ledgers.get(ledgerId));
+    }
+
     CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
         CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
         if (ledgerHandle != null) {
@@ -1857,7 +1863,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         } else {
             log.error("[{}] Failed to get message with ledger {}:{} the 
ledgerId does not belong to this topic "
                     + "or has been deleted.", name, position.getLedgerId(), 
position.getEntryId());
-            callback.readEntryFailed(new 
ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+            callback.readEntryFailed(new LedgerNotExistException("Message not 
found, "
                     + "the ledgerId does not belong to this topic or has been 
deleted"), ctx);
         }
 
@@ -3660,11 +3666,26 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
         }
     }
 
+    private static boolean isLedgerNotExistException(int rc) {
+        switch (rc) {
+            case Code.NoSuchLedgerExistsException:
+            case Code.NoSuchLedgerExistsOnMetadataServerException:
+                return true;
+
+            default:
+                return false;
+        }
+    }
+
     public static ManagedLedgerException createManagedLedgerException(int 
bkErrorCode) {
         if (bkErrorCode == BKException.Code.TooManyRequestsException) {
             return new TooManyRequestsException("Too many request error from 
bookies");
         } else if (isBkErrorNotRecoverable(bkErrorCode)) {
-            return new 
NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
+            if (isLedgerNotExistException(bkErrorCode)) {
+                return new 
LedgerNotExistException(BKException.getMessage(bkErrorCode));
+            } else {
+                return new 
NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
+            }
         } else {
             return new 
ManagedLedgerException(BKException.getMessage(bkErrorCode));
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index c5b340330ad..32cff81a64f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -26,8 +26,10 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import 
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -190,7 +192,28 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
                 && (exception instanceof NonRecoverableLedgerException)) {
             log.warn("[{}][{}] read failed from ledger at position:{} : {}", 
topicName, subName, failedReadPosition,
                     exception.getMessage());
-            findEntryComplete(failedReadPosition.get(), ctx);
+            if (exception instanceof LedgerNotExistException) {
+                long failedLedgerId = failedReadPosition.get().getLedgerId();
+                ManagedLedgerImpl ledger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+                Position lastPositionInLedger = 
ledger.getOptionalLedgerInfo(failedLedgerId)
+                        .map(ledgerInfo -> PositionImpl.get(failedLedgerId, 
ledgerInfo.getEntries() - 1))
+                        .orElseGet(() -> {
+                            Long nextExistingLedger = 
ledger.getNextValidLedger(failedReadPosition.get().getLedgerId());
+                            if (nextExistingLedger == null) {
+                                log.info("[{}] [{}] Couldn't find next next 
valid ledger for expiry monitor when find "
+                                                + "entry failed {}", 
ledger.getName(), ledger.getName(),
+                                        failedReadPosition);
+                                return (PositionImpl) failedReadPosition.get();
+                            } else {
+                                return PositionImpl.get(nextExistingLedger, 
-1);
+                            }
+                        });
+                log.info("[{}][{}] ledger not existed, will complete the last 
position of the non-existed"
+                        + " ledger:{}", topicName, subName, 
lastPositionInLedger);
+                findEntryComplete(lastPositionInLedger, ctx);
+            } else {
+                findEntryComplete(failedReadPosition.get(), ctx);
+            }
         }
         expirationCheckInProgress = FALSE;
         updateRates();
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 907aba67b9e..d86a749b7a6 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.offload.jcloud.impl;
 import com.google.common.base.Predicate;
 import io.netty.buffer.ByteBuf;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -347,6 +348,12 @@ public class MockManagedLedger implements ManagedLedger {
         return CompletableFuture.completedFuture(build);
     }
 
+    @Override
+    public Optional<LedgerInfo> getOptionalLedgerInfo(long ledgerId) {
+        final LedgerInfo build = 
LedgerInfo.newBuilder().setLedgerId(ledgerId).setSize(100).setEntries(20).build();
+        return Optional.of(build);
+    }
+
     @Override
     public CompletableFuture<Void> asyncTruncate() {
         return CompletableFuture.completedFuture(null);

Reply via email to