This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2c9e7296bf2 [improve][ml] Remove the redundant judgment logic of
ManagedCursorImpl (#18205)
2c9e7296bf2 is described below
commit 2c9e7296bf259191ee059e2a7cd7720c256b8c3c
Author: HuangZeGui <[email protected]>
AuthorDate: Thu Oct 27 11:57:20 2022 +0800
[improve][ml] Remove the redundant judgment logic of ManagedCursorImpl
(#18205)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index ce0619a9865..4fa73a20273 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -566,7 +566,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
}
- if (config.isDeletionAtBatchIndexLevelEnabled() &&
batchDeletedIndexes != null
+ if (config.isDeletionAtBatchIndexLevelEnabled()
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() >
0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
@@ -1227,7 +1227,7 @@ public class ManagedCursorImpl implements ManagedCursor {
lastMarkDeleteEntry = new
MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor()
? getProperties() : Collections.emptyMap(), null,
null);
individualDeletedMessages.clear();
- if (config.isDeletionAtBatchIndexLevelEnabled() &&
batchDeletedIndexes != null) {
+ if (config.isDeletionAtBatchIndexLevelEnabled()) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
long[] resetWords = newPosition.ackSet;
@@ -1866,7 +1866,7 @@ public class ManagedCursorImpl implements ManagedCursor {
PositionImpl newPosition = (PositionImpl) position;
- if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes
!= null) {
+ if (config.isDeletionAtBatchIndexLevelEnabled()) {
if (newPosition.ackSet != null) {
AtomicReference<BitSetRecyclable> bitSetRecyclable = new
AtomicReference<>();
BitSetRecyclable givenBitSet =
BitSetRecyclable.create().resetWords(newPosition.ackSet);
@@ -2049,7 +2049,7 @@ public class ManagedCursorImpl implements ManagedCursor {
try {
individualDeletedMessages.removeAtMost(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId());
- if (config.isDeletionAtBatchIndexLevelEnabled() &&
batchDeletedIndexes != null) {
+ if (config.isDeletionAtBatchIndexLevelEnabled()) {
Map<PositionImpl, BitSetRecyclable> subMap =
batchDeletedIndexes.subMap(PositionImpl.EARLIEST,
false,
PositionImpl.get(mdEntry.newPosition.getLedgerId(),
mdEntry.newPosition.getEntryId()), true);
@@ -2178,7 +2178,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (individualDeletedMessages.contains(position.getLedgerId(),
position.getEntryId())
|| position.compareTo(markDeletePosition) <= 0) {
- if (config.isDeletionAtBatchIndexLevelEnabled() &&
batchDeletedIndexes != null) {
+ if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable =
batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
@@ -2190,7 +2190,7 @@ public class ManagedCursorImpl implements ManagedCursor {
continue;
}
if (position.ackSet == null) {
- if (config.isDeletionAtBatchIndexLevelEnabled() &&
batchDeletedIndexes != null) {
+ if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable =
batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
@@ -2207,7 +2207,7 @@ public class ManagedCursorImpl implements ManagedCursor {
log.debug("[{}] [{}] Individually deleted messages:
{}", ledger.getName(), name,
individualDeletedMessages);
}
- } else if (config.isDeletionAtBatchIndexLevelEnabled() &&
batchDeletedIndexes != null) {
+ } else if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable givenBitSet =
BitSetRecyclable.create().resetWords(position.ackSet);
BitSetRecyclable bitSet =
batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);
if (givenBitSet != bitSet) {
@@ -2862,8 +2862,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private List<MLDataFormats.BatchedEntryDeletionIndexInfo>
buildBatchEntryDeletionIndexInfoList() {
lock.readLock().lock();
try {
- if (!config.isDeletionAtBatchIndexLevelEnabled() ||
batchDeletedIndexes == null
- || batchDeletedIndexes.isEmpty()) {
+ if (!config.isDeletionAtBatchIndexLevelEnabled() ||
batchDeletedIndexes.isEmpty()) {
return Collections.emptyList();
}
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder =
MLDataFormats.NestedPositionInfo
@@ -3314,7 +3313,7 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public long[] getDeletedBatchIndexesAsLongArray(PositionImpl position) {
- if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes
!= null) {
+ if (config.isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSet = batchDeletedIndexes.get(position);
return bitSet == null ? null : bitSet.toLongArray();
} else {