This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1631fedda57c55342352e7a4cc4da31c4508d772
Author: fengyubiao <[email protected]>
AuthorDate: Fri Feb 6 01:01:16 2026 +0800

    [fix][broker]Fix ledgerHandle failed to read by using new BK API (#25199)
    
    (cherry picked from commit 6d51f8883206fd81840faddf4cec840f6969b137)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java    | 2 +-
 .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java    | 4 ++--
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java    | 9 +++++++++
 .../mledger/impl/ManagedLedgerFactoryShutdownTest.java           | 6 ++++++
 .../broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java   | 2 +-
 .../pulsar/broker/service/schema/BookkeeperSchemaStorage.java    | 2 +-
 .../java/org/apache/pulsar/compaction/CompactedTopicImpl.java    | 2 +-
 7 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0a0042dafb2..790c81cadc9 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -685,7 +685,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         };
         try {
             bookkeeper.asyncOpenLedger(ledgerId, digestType, 
getConfig().getPassword(), openCallback,
-                    null);
+                    null, true);
         } catch (Throwable t) {
             log.error("[{}] Encountered error on opening cursor ledger {} for 
cursor {}",
                 ledger.getName(), ledgerId, name, t);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5b692c3f3be..6183e77de99 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -475,7 +475,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                         log.debug("[{}] Opening ledger {}", name, id);
                     }
                     mbean.startDataLedgerOpenOp();
-                    bookKeeper.asyncOpenLedger(id, digestType, 
config.getPassword(), opencb, null);
+                    bookKeeper.asyncOpenLedger(id, digestType, 
config.getPassword(), opencb, null, true);
                 } else {
                     initializeBookKeeper(callback);
                 }
@@ -1812,7 +1812,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                 handleBadVersion(new BadVersionException("the current ledger " 
+ currentLedger.getId()
                     + " was concurrent modified by a other bookie client. The 
error code is: " + errorCode));
             }
-        }, null);
+        }, null, true);
     }
 
     synchronized void ledgerClosed(final LedgerHandle lh) {
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 0b9e3e6b08d..6b37c4a5c18 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -6035,6 +6035,15 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
                 super.asyncOpenLedger(lId, digestType, passwd, cb, ctx);
             }
         }
+
+        public void asyncOpenLedger(final long lId, final DigestType 
digestType, final byte[] passwd,
+                final OpenCallback cb, final Object ctx, boolean 
keepMetadataUpdate) {
+            if (ledgerErrors.containsKey(lId)) {
+                cb.openComplete(ledgerErrors.get(lId), null, ctx);
+            } else {
+                super.asyncOpenLedger(lId, digestType, passwd, cb, ctx, 
keepMetadataUpdate);
+            }
+        }
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index ecc3423e292..95f0a6b8c77 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.mledger.impl;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -137,6 +138,11 @@ public class ManagedLedgerFactoryShutdownTest {
             cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class));
             return null;
         }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), 
any());
+        doAnswer(inv -> {
+            AsyncCallback.OpenCallback cb = inv.getArgument(3, 
AsyncCallback.OpenCallback.class);
+            cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class));
+            return null;
+        }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), 
any(), anyBoolean());
         doAnswer(inv -> {
             AsyncCallback.CreateCallback cb = inv.getArgument(5, 
AsyncCallback.CreateCallback.class);
             cb.createComplete(0, newLedgerHandle, inv.getArgument(6, 
Object.class));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index ba37092e88d..fa7408d7e15 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -213,7 +213,7 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
                     } else {
                         future.complete(handle);
                     }
-                }, null
+                }, null, true
         );
         return future;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index b931239a32c..e38bf48f1fd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -630,7 +630,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                 } else {
                     future.complete(handle);
                 }
-            }, null
+            }, null, true
         );
         return future;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index c1469b407cf..160c1525480 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -184,7 +184,7 @@ public class CompactedTopicImpl implements CompactedTopic {
                                } else {
                                    promise.complete(ledger);
                                }
-                           }, null);
+                           }, null, true);
         return promise.thenApply((ledger) -> new CompactedTopicContext(
                                          ledger, createCache(ledger, 
DEFAULT_MAX_CACHE_SIZE)));
     }

Reply via email to