This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1631fedda57c55342352e7a4cc4da31c4508d772 Author: fengyubiao <[email protected]> AuthorDate: Fri Feb 6 01:01:16 2026 +0800 [fix][broker]Fix ledgerHandle failed to read by using new BK API (#25199) (cherry picked from commit 6d51f8883206fd81840faddf4cec840f6969b137) --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++-- .../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 9 +++++++++ .../mledger/impl/ManagedLedgerFactoryShutdownTest.java | 6 ++++++ .../broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java | 2 +- .../pulsar/broker/service/schema/BookkeeperSchemaStorage.java | 2 +- .../java/org/apache/pulsar/compaction/CompactedTopicImpl.java | 2 +- 7 files changed, 21 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0a0042dafb2..790c81cadc9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -685,7 +685,7 @@ public class ManagedCursorImpl implements ManagedCursor { }; try { bookkeeper.asyncOpenLedger(ledgerId, digestType, getConfig().getPassword(), openCallback, - null); + null, true); } catch (Throwable t) { log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", ledger.getName(), ledgerId, name, t); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 5b692c3f3be..6183e77de99 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -475,7 +475,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.debug("[{}] Opening ledger {}", name, id); } mbean.startDataLedgerOpenOp(); - bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null); + bookKeeper.asyncOpenLedger(id, digestType, config.getPassword(), opencb, null, true); } else { initializeBookKeeper(callback); } @@ -1812,7 +1812,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { handleBadVersion(new BadVersionException("the current ledger " + currentLedger.getId() + " was concurrent modified by a other bookie client. The error code is: " + errorCode)); } - }, null); + }, null, true); } synchronized void ledgerClosed(final LedgerHandle lh) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 0b9e3e6b08d..6b37c4a5c18 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -6035,6 +6035,15 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { super.asyncOpenLedger(lId, digestType, passwd, cb, ctx); } } + + public void asyncOpenLedger(final long lId, final DigestType digestType, final byte[] passwd, + final OpenCallback cb, final Object ctx, boolean keepMetadataUpdate) { + if (ledgerErrors.containsKey(lId)) { + cb.openComplete(ledgerErrors.get(lId), null, ctx); + } else { + super.asyncOpenLedger(lId, digestType, passwd, cb, ctx, keepMetadataUpdate); + } + } } private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java index ecc3423e292..95f0a6b8c77 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -137,6 +138,11 @@ public class ManagedLedgerFactoryShutdownTest { cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class)); return null; }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), any()); + doAnswer(inv -> { + AsyncCallback.OpenCallback cb = inv.getArgument(3, AsyncCallback.OpenCallback.class); + cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class)); + return null; + }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(), any(), anyBoolean()); doAnswer(inv -> { AsyncCallback.CreateCallback cb = inv.getArgument(5, AsyncCallback.CreateCallback.class); cb.createComplete(0, newLedgerHandle, inv.getArgument(6, Object.class)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index ba37092e88d..fa7408d7e15 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -213,7 +213,7 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { } else { future.complete(handle); } - }, null + }, null, true ); return future; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index b931239a32c..e38bf48f1fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -630,7 +630,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage { } else { future.complete(handle); } - }, null + }, null, true ); return future; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index c1469b407cf..160c1525480 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -184,7 +184,7 @@ public class CompactedTopicImpl implements CompactedTopic { } else { promise.complete(ledger); } - }, null); + }, null, true); return promise.thenApply((ledger) -> new CompactedTopicContext( ledger, createCache(ledger, DEFAULT_MAX_CACHE_SIZE))); }
