This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6d51f888320 [fix][broker]Fix ledgerHandle failed to read by using new
BK API (#25199)
6d51f888320 is described below
commit 6d51f8883206fd81840faddf4cec840f6969b137
Author: fengyubiao <[email protected]>
AuthorDate: Fri Feb 6 01:01:16 2026 +0800
[fix][broker]Fix ledgerHandle failed to read by using new BK API (#25199)
---
.../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++--
.../org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 9 +++++++++
.../mledger/impl/ManagedLedgerFactoryShutdownTest.java | 6 ++++++
.../broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java | 2 +-
.../pulsar/broker/service/schema/BookkeeperSchemaStorage.java | 2 +-
.../java/org/apache/pulsar/compaction/CompactedTopicImpl.java | 2 +-
7 files changed, 21 insertions(+), 6 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 1ab6ceab9a2..7009ac750fe 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -686,7 +686,7 @@ public class ManagedCursorImpl implements ManagedCursor {
};
try {
bookkeeper.asyncOpenLedger(ledgerId, digestType,
getConfig().getPassword(), openCallback,
- null);
+ null, true);
} catch (Throwable t) {
log.error("[{}] Encountered error on opening cursor ledger {} for
cursor {}",
ledger.getName(), ledgerId, name, t);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6dd886fe4ea..b7affb01929 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -491,7 +491,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
log.debug("[{}] Opening ledger {}", name, id);
}
mbean.startDataLedgerOpenOp();
- bookKeeper.asyncOpenLedger(id, digestType,
config.getPassword(), opencb, null);
+ bookKeeper.asyncOpenLedger(id, digestType,
config.getPassword(), opencb, null, true);
} else {
initializeBookKeeper(callback);
}
@@ -1918,7 +1918,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
handleBadVersion(new BadVersionException("the current ledger "
+ currentLedger.getId()
+ " was concurrent modified by a other bookie client. The
error code is: " + errorCode));
}
- }, null);
+ }, null, true);
}
synchronized void ledgerClosed(final LedgerHandle lh) {
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index f41269b49af..522e1290320 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -6045,6 +6045,15 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
super.asyncOpenLedger(lId, digestType, passwd, cb, ctx);
}
}
+
+ public void asyncOpenLedger(final long lId, final DigestType
digestType, final byte[] passwd,
+ final OpenCallback cb, final Object ctx, boolean
keepMetadataUpdate) {
+ if (ledgerErrors.containsKey(lId)) {
+ cb.openComplete(ledgerErrors.get(lId), null, ctx);
+ } else {
+ super.asyncOpenLedger(lId, digestType, passwd, cb, ctx,
keepMetadataUpdate);
+ }
+ }
}
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorTest.class);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index ecc3423e292..95f0a6b8c77 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger.impl;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
@@ -137,6 +138,11 @@ public class ManagedLedgerFactoryShutdownTest {
cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class));
return null;
}).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(),
any());
+ doAnswer(inv -> {
+ AsyncCallback.OpenCallback cb = inv.getArgument(3,
AsyncCallback.OpenCallback.class);
+ cb.openComplete(0, ledgerHandle, inv.getArgument(4, Object.class));
+ return null;
+ }).when(bookKeeper).asyncOpenLedger(anyLong(), any(), any(), any(),
any(), anyBoolean());
doAnswer(inv -> {
AsyncCallback.CreateCallback cb = inv.getArgument(5,
AsyncCallback.CreateCallback.class);
cb.createComplete(0, newLedgerHandle, inv.getArgument(6,
Object.class));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index ba37092e88d..fa7408d7e15 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -213,7 +213,7 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
} else {
future.complete(handle);
}
- }, null
+ }, null, true
);
return future;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index b931239a32c..e38bf48f1fd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -630,7 +630,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
} else {
future.complete(handle);
}
- }, null
+ }, null, true
);
return future;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 21cc0e45acd..2ceaab83f22 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -184,7 +184,7 @@ public class CompactedTopicImpl implements CompactedTopic {
} else {
promise.complete(ledger);
}
- }, null);
+ }, null, true);
return promise.thenApply((ledger) -> new CompactedTopicContext(
ledger, createCache(ledger,
DEFAULT_MAX_CACHE_SIZE)));
}