This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ea2aa4eb8042feef467a0e2de8f4b0e0638c8de3 Author: Penghui Li <[email protected]> AuthorDate: Wed Sep 7 15:27:32 2022 +0800 [improve][broker] Improve cursor.getNumberOfEntries if isUnackedRangesOpenCacheSetEnabled=true (#17465) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 41 +++++++++++++--------- .../bookkeeper/mledger/impl/RangeSetWrapper.java | 5 +++ .../ConcurrentOpenLongPairRangeSet.java | 23 ++++++++++++ .../common/util/collections/LongPairRangeSet.java | 10 ++++++ .../ConcurrentOpenLongPairRangeSetTest.java | 20 +++++++++++ 5 files changed, 82 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 8dee026f57c..25f1d8760b9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1453,25 +1453,32 @@ public class ManagedCursorImpl implements ManagedCursor { lock.readLock().lock(); try { - individualDeletedMessages.forEach((r) -> { - try { - if (r.isConnected(range)) { - Range<PositionImpl> commonEntries = r.intersection(range); - long commonCount = ledger.getNumberOfEntries(commonEntries); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Discounting {} entries for already deleted range {}", ledger.getName(), - name, commonCount, commonEntries); + if (config.isUnackedRangesOpenCacheSetEnabled()) { + int cardinality = individualDeletedMessages.cardinality( + range.lowerEndpoint().ledgerId, range.lowerEndpoint().entryId, + range.upperEndpoint().ledgerId, range.upperEndpoint().entryId); + deletedEntries.addAndGet(cardinality); + } else { + individualDeletedMessages.forEach((r) -> { + try { + if (r.isConnected(range)) { + Range<PositionImpl> commonEntries = r.intersection(range); + long commonCount = ledger.getNumberOfEntries(commonEntries); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Discounting {} entries for already deleted range {}", + ledger.getName(), name, commonCount, commonEntries); + } + deletedEntries.addAndGet(commonCount); + } + return true; + } finally { + if (r.lowerEndpoint() instanceof PositionImplRecyclable) { + ((PositionImplRecyclable) r.lowerEndpoint()).recycle(); + ((PositionImplRecyclable) r.upperEndpoint()).recycle(); } - deletedEntries.addAndGet(commonCount); - } - return true; - } finally { - if (r.lowerEndpoint() instanceof PositionImplRecyclable) { - ((PositionImplRecyclable) r.lowerEndpoint()).recycle(); - ((PositionImplRecyclable) r.upperEndpoint()).recycle(); } - } - }, recyclePositionRangeConverter); + }, recyclePositionRangeConverter); + } } finally { lock.readLock().unlock(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java index b0314f4e775..02d7967f9fc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java @@ -133,6 +133,11 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe return rangeSet.lastRange(); } + @Override + public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) { + return rangeSet.cardinality(lowerKey, lowerValue, upperKey, upperValue); + } + @VisibleForTesting void add(Range<LongPair> range) { if (!(rangeSet instanceof ConcurrentOpenLongPairRangeSet)) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java index 05d16c4b054..a71c5ceb8de 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java @@ -242,6 +242,29 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper)); } + @Override + public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) { + NavigableMap<Long, BitSet> subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true); + MutableInt v = new MutableInt(0); + subMap.forEach((key, bitset) -> { + if (key == lowerKey || key == upperKey) { + BitSet temp = (BitSet) bitset.clone(); + // Trim the bitset index which < lowerValue + if (key == lowerKey) { + temp.clear(0, (int) Math.max(0, lowerValue)); + } + // Trim the bitset index which > upperValue + if (key == upperKey) { + temp.clear((int) Math.min(upperValue + 1, temp.length()), temp.length()); + } + v.add(temp.cardinality()); + } else { + v.add(bitset.cardinality()); + } + }); + return v.intValue(); + } + @Override public int size() { if (updatedAfterCachedForSize) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java index ba77ff4b839..d804900ed42 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java @@ -125,6 +125,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> { */ Range<T> lastRange(); + /** + * Return the number bit sets to true from lower (inclusive) to upper (inclusive). + */ + int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue); + /** * Represents a function that accepts two long arguments and produces a result. * @@ -296,6 +301,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> { return list.get(list.size() - 1); } + @Override + public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) { + throw new UnsupportedOperationException(); + } + @Override public int size() { return set.asRanges().size(); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java index 2c0b8d3552c..5d9af2e0227 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java @@ -460,4 +460,24 @@ public class ConcurrentOpenLongPairRangeSetTest { gRangeConnected.add(lastRange); return gRangeConnected; } + + @Test + public void testCardinality() { + ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer); + int v = set.cardinality(0, 0, Integer.MAX_VALUE, Integer.MAX_VALUE); + assertEquals(v, 0 ); + set.addOpenClosed(1, 0, 1, 20); + set.addOpenClosed(1, 30, 1, 90); + set.addOpenClosed(2, 0, 3, 30); + v = set.cardinality(1, 0, 1, 100); + assertEquals(v, 80); + v = set.cardinality(1, 11, 1, 100); + assertEquals(v, 70); + v = set.cardinality(1, 0, 1, 90); + assertEquals(v, 80); + v = set.cardinality(1, 0, 1, 80); + assertEquals(v, 70); + v = set.cardinality(1, 0, 3, 30); + assertEquals(v, 80 + 31); + } }
