This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a88ac659a69 [improve][broker] Improve cursor.getNumberOfEntries if
isUnackedRangesOpenCacheSetEnabled=true (#17465)
a88ac659a69 is described below
commit a88ac659a69763770dc3c3da272c03d6bfa8a897
Author: Penghui Li <[email protected]>
AuthorDate: Wed Sep 7 15:27:32 2022 +0800
[improve][broker] Improve cursor.getNumberOfEntries if
isUnackedRangesOpenCacheSetEnabled=true (#17465)
(cherry picked from commit 09edcceab419ef28a9311ac480c0335c8f9dd87e)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 41 +++++++++++++---------
.../ConcurrentOpenLongPairRangeSet.java | 24 +++++++++++++
.../common/util/collections/LongPairRangeSet.java | 10 ++++++
.../ConcurrentOpenLongPairRangeSetTest.java | 20 +++++++++++
4 files changed, 78 insertions(+), 17 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index feeaffd1dd8..8db4d12f2fb 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1431,25 +1431,32 @@ public class ManagedCursorImpl implements ManagedCursor
{
lock.readLock().lock();
try {
- individualDeletedMessages.forEach((r) -> {
- try {
- if (r.isConnected(range)) {
- Range<PositionImpl> commonEntries =
r.intersection(range);
- long commonCount =
ledger.getNumberOfEntries(commonEntries);
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Discounting {} entries for
already deleted range {}", ledger.getName(),
- name, commonCount, commonEntries);
+ if (config.isUnackedRangesOpenCacheSetEnabled()) {
+ int cardinality = individualDeletedMessages.cardinality(
+ range.lowerEndpoint().ledgerId,
range.lowerEndpoint().entryId,
+ range.upperEndpoint().ledgerId,
range.upperEndpoint().entryId);
+ deletedEntries.addAndGet(cardinality);
+ } else {
+ individualDeletedMessages.forEach((r) -> {
+ try {
+ if (r.isConnected(range)) {
+ Range<PositionImpl> commonEntries =
r.intersection(range);
+ long commonCount =
ledger.getNumberOfEntries(commonEntries);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Discounting {} entries
for already deleted range {}",
+ ledger.getName(), name, commonCount,
commonEntries);
+ }
+ deletedEntries.addAndGet(commonCount);
+ }
+ return true;
+ } finally {
+ if (r.lowerEndpoint() instanceof
PositionImplRecyclable) {
+ ((PositionImplRecyclable)
r.lowerEndpoint()).recycle();
+ ((PositionImplRecyclable)
r.upperEndpoint()).recycle();
}
- deletedEntries.addAndGet(commonCount);
- }
- return true;
- } finally {
- if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
- ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
- ((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
- }
- }, recyclePositionRangeConverter);
+ }, recyclePositionRangeConverter);
+ }
} finally {
lock.readLock().unlock();
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 6e647966938..9bdfc93bd11 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -29,6 +29,7 @@ import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.mutable.MutableInt;
/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}.
This can be alternative of
@@ -242,6 +243,29 @@ public class ConcurrentOpenLongPairRangeSet<T extends
Comparable<T>> implements
return Range.openClosed(consumer.apply(lastSet.getKey(), lower),
consumer.apply(lastSet.getKey(), upper));
}
+ @Override
+ public int cardinality(long lowerKey, long lowerValue, long upperKey, long
upperValue) {
+ NavigableMap<Long, BitSet> subMap = rangeBitSetMap.subMap(lowerKey,
true, upperKey, true);
+ MutableInt v = new MutableInt(0);
+ subMap.forEach((key, bitset) -> {
+ if (key == lowerKey || key == upperKey) {
+ BitSet temp = (BitSet) bitset.clone();
+ // Trim the bitset index which < lowerValue
+ if (key == lowerKey) {
+ temp.clear(0, (int) Math.max(0, lowerValue));
+ }
+ // Trim the bitset index which > upperValue
+ if (key == upperKey) {
+ temp.clear((int) Math.min(upperValue + 1, temp.length()),
temp.length());
+ }
+ v.add(temp.cardinality());
+ } else {
+ v.add(bitset.cardinality());
+ }
+ });
+ return v.intValue();
+ }
+
@Override
public int size() {
if (updatedAfterCachedForSize) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index a6da065c673..739bd6e7df6 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -126,6 +126,11 @@ public interface LongPairRangeSet<T extends Comparable<T>>
{
*/
Range<T> lastRange();
+ /**
+ * Return the number bit sets to true from lower (inclusive) to upper
(inclusive).
+ */
+ int cardinality(long lowerKey, long lowerValue, long upperKey, long
upperValue);
+
/**
* Represents a function that accepts two long arguments and produces a
result.
*
@@ -289,6 +294,11 @@ public interface LongPairRangeSet<T extends Comparable<T>>
{
return list.get(list.size() - 1);
}
+ @Override
+ public int cardinality(long lowerKey, long lowerValue, long upperKey,
long upperValue) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public int size() {
return set.asRanges().size();
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
index 2c0b8d3552c..5d9af2e0227 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -460,4 +460,24 @@ public class ConcurrentOpenLongPairRangeSetTest {
gRangeConnected.add(lastRange);
return gRangeConnected;
}
+
+ @Test
+ public void testCardinality() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new
ConcurrentOpenLongPairRangeSet<>(consumer);
+ int v = set.cardinality(0, 0, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ assertEquals(v, 0 );
+ set.addOpenClosed(1, 0, 1, 20);
+ set.addOpenClosed(1, 30, 1, 90);
+ set.addOpenClosed(2, 0, 3, 30);
+ v = set.cardinality(1, 0, 1, 100);
+ assertEquals(v, 80);
+ v = set.cardinality(1, 11, 1, 100);
+ assertEquals(v, 70);
+ v = set.cardinality(1, 0, 1, 90);
+ assertEquals(v, 80);
+ v = set.cardinality(1, 0, 1, 80);
+ assertEquals(v, 70);
+ v = set.cardinality(1, 0, 3, 30);
+ assertEquals(v, 80 + 31);
+ }
}