This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8332499ab2d [improve][ml] Clean up managed-ledger code: deduplicate
logic and use shared utilities (#25298)
8332499ab2d is described below
commit 8332499ab2d1b2a5863fc495e312d2d29c162c70
Author: Penghui Li <[email protected]>
AuthorDate: Mon Mar 9 05:48:27 2026 -0700
[improve][ml] Clean up managed-ledger code: deduplicate logic and use
shared utilities (#25298)
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 6 ++--
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 37 +++++++++-------------
.../mledger/impl/NullLedgerOffloader.java | 13 +++-----
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 16 +++-------
.../mledger/impl/cache/RangeEntryCacheImpl.java | 26 +++++++--------
5 files changed, 38 insertions(+), 60 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 97333fbb1e3..b8b5b78a62c 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -1050,8 +1050,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
- if (rc ==
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
- || rc ==
BKException.Code.NoSuchLedgerExistsException) {
+ if
(Errors.isNoSuchLedgerExistsException(rc)) {
log.info("Ledger {} does not exist,
ignoring", li.ledgerId);
return null;
}
@@ -1092,8 +1091,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
.handle((result, ex) -> {
if (ex != null) {
int rc = BKException.getExceptionCode(ex);
- if (rc ==
BKException.Code.NoSuchLedgerExistsOnMetadataServerException
- || rc ==
BKException.Code.NoSuchLedgerExistsException) {
+ if (Errors.isNoSuchLedgerExistsException(rc)) {
log.info("Ledger {} does not exist, ignoring",
cursor.cursorsLedgerId);
return null;
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ce0bf59edc7..0606c6fb074 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -48,7 +48,6 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -4515,22 +4514,11 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
}
- private static boolean isLedgerNotExistException(int rc) {
- switch (rc) {
- case Code.NoSuchLedgerExistsException:
- case Code.NoSuchLedgerExistsOnMetadataServerException:
- return true;
-
- default:
- return false;
- }
- }
-
public static ManagedLedgerException createManagedLedgerException(int
bkErrorCode) {
if (bkErrorCode == BKException.Code.TooManyRequestsException) {
return new TooManyRequestsException("Too many request error from
bookies");
} else if (isBkErrorNotRecoverable(bkErrorCode)) {
- if (isLedgerNotExistException(bkErrorCode)) {
+ if (isNoSuchLedgerExistsException(bkErrorCode)) {
return new
LedgerNotExistException(BKException.getMessage(bkErrorCode));
} else {
return new
NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
@@ -4541,16 +4529,21 @@ public class ManagedLedgerImpl implements
ManagedLedger, CreateCallback {
}
public static ManagedLedgerException
createManagedLedgerException(Throwable t) {
- if (t instanceof org.apache.bookkeeper.client.api.BKException) {
- return
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException)
t).getCode());
- } else if (t instanceof ManagedLedgerException) {
- return (ManagedLedgerException) t;
- } else if (t instanceof CompletionException
- && !(t.getCause() instanceof CompletionException) /* check to
avoid stackoverlflow */) {
- return createManagedLedgerException(t.getCause());
+ if (t == null) {
+ return new ManagedLedgerException("Unknown exception");
+ }
+ Throwable cause = FutureUtil.unwrapCompletionException(t);
+ if (cause == null) {
+ log.error("Exception with null cause for ManagedLedgerException.",
t);
+ return new ManagedLedgerException("Unknown exception", t);
+ }
+ if (cause instanceof org.apache.bookkeeper.client.api.BKException) {
+ return
createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException)
cause).getCode());
+ } else if (cause instanceof ManagedLedgerException) {
+ return (ManagedLedgerException) cause;
} else {
- log.error("Unknown exception for ManagedLedgerException.", t);
- return new ManagedLedgerException("Other exception", t);
+ log.error("Unknown exception for ManagedLedgerException.", cause);
+ return new ManagedLedgerException("Other exception", cause);
}
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
index fe646bc82e5..807a703d540 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NullLedgerOffloader.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
/**
* Null implementation that throws an error on any invokation.
@@ -40,25 +41,19 @@ public class NullLedgerOffloader implements LedgerOffloader
{
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uid,
Map<String, String> extraMetadata) {
- CompletableFuture<Void> promise = new CompletableFuture<>();
- promise.completeExceptionally(new UnsupportedOperationException());
- return promise;
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
}
@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
Map<String, String>
offloadDriverMetadata) {
- CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
- promise.completeExceptionally(new UnsupportedOperationException());
- return promise;
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
}
@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String>
offloadDriverMetadata) {
- CompletableFuture<Void> promise = new CompletableFuture<>();
- promise.completeExceptionally(new UnsupportedOperationException());
- return promise;
+ return FutureUtil.failedFuture(new UnsupportedOperationException());
}
@Override
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index bcd87e62629..bf452166920 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -288,16 +288,7 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable, Managed
}
} else {
updateLatency();
- AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
- if (cb != null) {
- cb.addComplete(lastEntry, data.asReadOnly(), ctx);
- ml.notifyCursors();
- ml.notifyWaitingEntryCallBacks();
- ReferenceCountUtil.release(data);
- this.recycle();
- } else {
- ReferenceCountUtil.release(data);
- }
+ completeAdd(lastEntry, ctx);
}
}
@@ -314,10 +305,13 @@ public class OpAddEntry implements AddCallback,
CloseCallback, Runnable, Managed
ml.ledgerClosed(lh);
updateLatency();
+ completeAdd(PositionFactory.create(lh.getId(), entryId), ctx);
+ }
+ private void completeAdd(Position pos, Object ctx) {
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
- cb.addComplete(PositionFactory.create(lh.getId(), entryId),
data.asReadOnly(), ctx);
+ cb.addComplete(pos, data.asReadOnly(), ctx);
ml.notifyCursors();
ml.notifyWaitingEntryCallBacks();
ReferenceCountUtil.release(data);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index 4dc60b6434f..352ac341e56 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -203,15 +203,7 @@ public class RangeEntryCacheImpl implements EntryCache {
return;
}
- Pair<Integer, Long> removed = entries.removeRange(firstPosition,
lastPosition, false);
- int entriesRemoved = removed.getLeft();
- long sizeRemoved = removed.getRight();
- if (log.isTraceEnabled()) {
- log.trace("[{}] Invalidated entries up to {} - Entries removed: {}
- Size removed: {}", ml.getName(),
- lastPosition, entriesRemoved, sizeRemoved);
- }
-
- manager.entriesRemoved(sizeRemoved, entriesRemoved);
+ removeRangeAndNotify(firstPosition, lastPosition);
}
@Override
@@ -219,16 +211,22 @@ public class RangeEntryCacheImpl implements EntryCache {
final Position firstPosition = PositionFactory.create(ledgerId, 0);
final Position lastPosition = PositionFactory.create(ledgerId + 1, 0);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Invalidating all entries on ledger {}",
ml.getName(), ledgerId);
+ }
+ removeRangeAndNotify(firstPosition, lastPosition);
+ pendingReadsManager.invalidateLedger(ledgerId);
+ }
+
+ private void removeRangeAndNotify(Position firstPosition, Position
lastPosition) {
Pair<Integer, Long> removed = entries.removeRange(firstPosition,
lastPosition, false);
int entriesRemoved = removed.getLeft();
long sizeRemoved = removed.getRight();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Invalidated all entries on ledger {} - Entries
removed: {} - Size removed: {}",
- ml.getName(), ledgerId, entriesRemoved, sizeRemoved);
+ if (log.isTraceEnabled()) {
+ log.trace("[{}] Invalidated entries in range [{}, {}] - Entries
removed: {} - Size removed: {}",
+ ml.getName(), firstPosition, lastPosition, entriesRemoved,
sizeRemoved);
}
-
manager.entriesRemoved(sizeRemoved, entriesRemoved);
- pendingReadsManager.invalidateLedger(ledgerId);
}
@Override