This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 781a02b2085 [cleanup][ml] ManagedCursor clean up. (#22246)
781a02b2085 is described below
commit 781a02b20859e61361f1d18c369c5d00d1b2f7fd
Author: 道君 <[email protected]>
AuthorDate: Tue Mar 12 23:36:59 2024 +0800
[cleanup][ml] ManagedCursor clean up. (#22246)
---
.../java/org/apache/bookkeeper/mledger/impl/EntryImpl.java | 7 ++++++-
.../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++--------
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
index 6512399173f..80397931357 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java
@@ -42,6 +42,7 @@ public final class EntryImpl extends
AbstractCASReferenceCounted implements Entr
private long timestamp;
private long ledgerId;
private long entryId;
+ private PositionImpl position;
ByteBuf data;
private Runnable onDeallocate;
@@ -151,7 +152,10 @@ public final class EntryImpl extends
AbstractCASReferenceCounted implements Entr
@Override
public PositionImpl getPosition() {
- return new PositionImpl(ledgerId, entryId);
+ if (position == null) {
+ position = PositionImpl.get(ledgerId, entryId);
+ }
+ return position;
}
@Override
@@ -197,6 +201,7 @@ public final class EntryImpl extends
AbstractCASReferenceCounted implements Entr
timestamp = -1;
ledgerId = -1;
entryId = -1;
+ position = null;
recyclerHandle.recycle(this);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 0c8dedd6b21..7065af203da 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1506,10 +1506,7 @@ public class ManagedCursorImpl implements ManagedCursor {
Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
lock.readLock().lock();
try {
- positions.stream()
- .filter(position -> ((PositionImpl)
position).compareTo(markDeletePosition) <= 0
- ||
individualDeletedMessages.contains(position.getLedgerId(),
position.getEntryId()))
- .forEach(alreadyAcknowledgedPositions::add);
+
positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
}
@@ -2281,8 +2278,7 @@ public class ManagedCursorImpl implements ManagedCursor {
return;
}
- if (position.compareTo(markDeletePosition) <= 0
- ||
individualDeletedMessages.contains(position.getLedgerId(),
position.getEntryId())) {
+ if (isMessageDeleted(position)) {
if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable =
batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
@@ -3517,8 +3513,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void trimDeletedEntries(List<Entry> entries) {
entries.removeIf(entry -> {
- boolean isDeleted =
markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0
- || individualDeletedMessages.contains(entry.getLedgerId(),
entry.getEntryId());
+ boolean isDeleted = isMessageDeleted(entry.getPosition());
if (isDeleted) {
entry.release();
}