This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d55145e07f9 [improve][broker] Use RoaringBitmap in tracking individual
acks to reduce memory usage (#23006)
d55145e07f9 is described below
commit d55145e07f940617f0673ad9405eadf0b2ded7cd
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Jul 6 13:29:00 2024 +0300
[improve][broker] Use RoaringBitmap in tracking individual acks to reduce
memory usage (#23006)
(cherry picked from commit ed39c4db671c29057e51b9142a0d4cdb71e3eb88)
---
distribution/server/src/assemble/LICENSE.bin.txt | 3 +--
distribution/shell/src/assemble/LICENSE.bin.txt | 2 ++
.../apache/bookkeeper/mledger/impl/RangeSetWrapper.java | 2 +-
pom.xml | 2 +-
pulsar-common/pom.xml | 5 +++++
.../common/util/collections/OpenLongPairRangeSet.java | 15 ++-------------
6 files changed, 12 insertions(+), 17 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index 498c069fea9..f46b18347c1 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -513,8 +513,7 @@ The Apache Software License, Version 2.0
* RxJava
- io.reactivex.rxjava3-rxjava-3.0.1.jar
* RoaringBitmap
- - org.roaringbitmap-RoaringBitmap-0.9.44.jar
- - org.roaringbitmap-shims-0.9.44.jar
+ - org.roaringbitmap-RoaringBitmap-1.2.0.jar
* OpenTelemetry
- io.opentelemetry-opentelemetry-api-1.38.0.jar
- io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 33c7f1b0e2f..5421257a268 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -382,6 +382,8 @@ The Apache Software License, Version 2.0
- simpleclient_tracer_common-0.16.0.jar
- simpleclient_tracer_otel-0.16.0.jar
- simpleclient_tracer_otel_agent-0.16.0.jar
+ * RoaringBitmap
+ - RoaringBitmap-1.2.0.jar
* Log4J
- log4j-api-2.23.1.jar
- log4j-core-2.23.1.jar
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
index 299fd3dc74c..c193d71c64f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
@@ -55,7 +55,7 @@ public class RangeSetWrapper<T extends Comparable<T>>
implements LongPairRangeSe
this.config = managedCursor.getManagedLedger().getConfig();
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
- ? new OpenLongPairRangeSet<>(4096, rangeConverter)
+ ? new OpenLongPairRangeSet<>(rangeConverter)
: new LongPairRangeSet.DefaultRangeSet<>(rangeConverter,
rangeBoundConsumer);
this.enableMultiEntry =
config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
}
diff --git a/pom.xml b/pom.xml
index 8f39806ceeb..d146e10e375 100644
--- a/pom.xml
+++ b/pom.xml
@@ -313,7 +313,7 @@ flexible messaging model and an intuitive client
API.</description>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>10.0.1</dependency-check-maven.version>
- <roaringbitmap.version>0.9.44</roaringbitmap.version>
+ <roaringbitmap.version>1.2.0</roaringbitmap.version>
<extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version>
<oshi.version>6.4.0</oshi.version>
<checkerframework.version>3.33.0</checkerframework.version>
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 1b57803a730..bfc0e0ac78f 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -243,6 +243,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java
index c053c106be2..5114675324a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/OpenLongPairRangeSet.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.lang.mutable.MutableInt;
+import org.roaringbitmap.RoaringBitSet;
/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}.
This can be alternative of
@@ -46,8 +47,6 @@ import org.apache.commons.lang.mutable.MutableInt;
public class OpenLongPairRangeSet<T extends Comparable<T>> implements
LongPairRangeSet<T> {
protected final NavigableMap<Long, BitSet> rangeBitSetMap = new
ConcurrentSkipListMap<>();
- private boolean threadSafe = true;
- private final int bitSetSize;
private final LongPairConsumer<T> consumer;
// caching place-holder for cpu-optimization to avoid calculating ranges
again
@@ -57,16 +56,6 @@ public class OpenLongPairRangeSet<T extends Comparable<T>>
implements LongPairRa
private volatile boolean updatedAfterCachedForToString = true;
public OpenLongPairRangeSet(LongPairConsumer<T> consumer) {
- this(1024, true, consumer);
- }
-
- public OpenLongPairRangeSet(int size, LongPairConsumer<T> consumer) {
- this(size, true, consumer);
- }
-
- public OpenLongPairRangeSet(int size, boolean threadSafe,
LongPairConsumer<T> consumer) {
- this.threadSafe = threadSafe;
- this.bitSetSize = size;
this.consumer = consumer;
}
@@ -416,7 +405,7 @@ public class OpenLongPairRangeSet<T extends Comparable<T>>
implements LongPairRa
}
private BitSet createNewBitSet() {
- return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new
BitSet(bitSetSize);
+ return new RoaringBitSet();
}
}