This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new e45071e08b9 Revert "[improve][broker] Optimize
`ConcurrentOpenLongPairRangeSet` by RoaringBitmap (#22908)"
e45071e08b9 is described below
commit e45071e08b95f18733967d30112370dadba8a3c9
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jun 24 19:36:22 2024 +0300
Revert "[improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by
RoaringBitmap (#22908)"
This reverts commit 6b59c35e5fac43fa782a082026a7c56bcc453c38.
---
distribution/server/src/assemble/LICENSE.bin.txt | 3 +-
distribution/shell/src/assemble/LICENSE.bin.txt | 2 -
pom.xml | 2 +-
pulsar-common/pom.xml | 5 -
.../ConcurrentOpenLongPairRangeSet.java | 12 +-
.../util/collections/ConcurrentRoaringBitSet.java | 439 ---------------------
6 files changed, 10 insertions(+), 453 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index c2216378c27..16937c9f52d 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -513,7 +513,8 @@ The Apache Software License, Version 2.0
* RxJava
- io.reactivex.rxjava3-rxjava-3.0.1.jar
* RoaringBitmap
- - org.roaringbitmap-RoaringBitmap-1.1.0.jar
+ - org.roaringbitmap-RoaringBitmap-0.9.44.jar
+ - org.roaringbitmap-shims-0.9.44.jar
* OpenTelemetry
- io.opentelemetry-opentelemetry-api-1.38.0.jar
- io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt
b/distribution/shell/src/assemble/LICENSE.bin.txt
index c7c9846c776..6b7e2768808 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -382,8 +382,6 @@ The Apache Software License, Version 2.0
- simpleclient_tracer_common-0.16.0.jar
- simpleclient_tracer_otel-0.16.0.jar
- simpleclient_tracer_otel_agent-0.16.0.jar
- * RoaringBitmap
- - RoaringBitmap-1.1.0.jar
* Log4J
- log4j-api-2.23.1.jar
- log4j-core-2.23.1.jar
diff --git a/pom.xml b/pom.xml
index 54ebfb087c9..fd5cd34bfd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -313,7 +313,7 @@ flexible messaging model and an intuitive client
API.</description>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>9.1.0</dependency-check-maven.version>
- <roaringbitmap.version>1.1.0</roaringbitmap.version>
+ <roaringbitmap.version>0.9.44</roaringbitmap.version>
<extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version>
<oshi.version>6.4.0</oshi.version>
<checkerframework.version>3.33.0</checkerframework.version>
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index bfc0e0ac78f..1b57803a730 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -243,11 +243,6 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
-
- <dependency>
- <groupId>org.roaringbitmap</groupId>
- <artifactId>RoaringBitmap</artifactId>
- </dependency>
</dependencies>
<build>
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index b5ad89d1695..72215d7296c 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -29,7 +29,6 @@ import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.mutable.MutableInt;
-import org.roaringbitmap.RoaringBitSet;
/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}.
This can be alternative of
@@ -45,7 +44,7 @@ import org.roaringbitmap.RoaringBitSet;
public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>>
implements LongPairRangeSet<T> {
protected final NavigableMap<Long, BitSet> rangeBitSetMap = new
ConcurrentSkipListMap<>();
- private final boolean threadSafe;
+ private boolean threadSafe = true;
private final int bitSetSize;
private final LongPairConsumer<T> consumer;
@@ -96,7 +95,9 @@ public class ConcurrentOpenLongPairRangeSet<T extends
Comparable<T>> implements
// (2) set 0th-index to upper-index in upperRange.getKey()
if (isValid(upperKey, upperValue)) {
BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey,
(key) -> createNewBitSet());
- rangeBitSet.set(0, (int) upperValue + 1);
+ if (rangeBitSet != null) {
+ rangeBitSet.set(0, (int) upperValue + 1);
+ }
}
// No-op if values are not valid eg: if lower == LongPair.earliest
or upper == LongPair.latest then nothing
// to set
@@ -413,6 +414,7 @@ public class ConcurrentOpenLongPairRangeSet<T extends
Comparable<T>> implements
}
private BitSet createNewBitSet() {
- return this.threadSafe ? new ConcurrentRoaringBitSet() : new
RoaringBitSet();
+ return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new
BitSet(bitSetSize);
}
-}
\ No newline at end of file
+
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
deleted file mode 100644
index 814e5840099..00000000000
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util.collections;
-
-import java.util.BitSet;
-import java.util.concurrent.locks.StampedLock;
-import java.util.stream.IntStream;
-import org.roaringbitmap.RoaringBitSet;
-
-public class ConcurrentRoaringBitSet extends RoaringBitSet {
- private final StampedLock rwLock = new StampedLock();
-
- public ConcurrentRoaringBitSet() {
- super();
- }
-
- @Override
- public boolean get(int bitIndex) {
- long stamp = rwLock.tryOptimisticRead();
- boolean isSet = super.get(bitIndex);
- if (!rwLock.validate(stamp)) {
- stamp = rwLock.readLock();
- try {
- isSet = super.get(bitIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return isSet;
- }
-
- @Override
- public void set(int bitIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.set(bitIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void clear(int bitIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.clear(bitIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void set(int fromIndex, int toIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.set(fromIndex, toIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void clear(int fromIndex, int toIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.clear(fromIndex, toIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void clear() {
- long stamp = rwLock.writeLock();
- try {
- super.clear();
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public int nextSetBit(int fromIndex) {
- long stamp = rwLock.tryOptimisticRead();
- int nextSetBit = super.nextSetBit(fromIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- nextSetBit = super.nextSetBit(fromIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return nextSetBit;
- }
-
- @Override
- public int nextClearBit(int fromIndex) {
- long stamp = rwLock.tryOptimisticRead();
- int nextClearBit = super.nextClearBit(fromIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- nextClearBit = super.nextClearBit(fromIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return nextClearBit;
- }
-
- @Override
- public int previousSetBit(int fromIndex) {
- long stamp = rwLock.tryOptimisticRead();
- int previousSetBit = super.previousSetBit(fromIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- previousSetBit = super.previousSetBit(fromIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return previousSetBit;
- }
-
- @Override
- public int previousClearBit(int fromIndex) {
- long stamp = rwLock.tryOptimisticRead();
- int previousClearBit = super.previousClearBit(fromIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- previousClearBit = super.previousClearBit(fromIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return previousClearBit;
- }
-
- @Override
- public int length() {
- long stamp = rwLock.tryOptimisticRead();
- int length = super.length();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- length = super.length();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return length;
- }
-
- @Override
- public boolean isEmpty() {
- long stamp = rwLock.tryOptimisticRead();
- boolean isEmpty = super.isEmpty();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- isEmpty = super.isEmpty();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return isEmpty;
- }
-
- @Override
- public int cardinality() {
- long stamp = rwLock.tryOptimisticRead();
- int cardinality = super.cardinality();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- cardinality = super.cardinality();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return cardinality;
- }
-
- @Override
- public int size() {
- long stamp = rwLock.tryOptimisticRead();
- int size = super.size();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- size = super.size();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return size;
- }
-
- @Override
- public byte[] toByteArray() {
- long stamp = rwLock.tryOptimisticRead();
- byte[] byteArray = super.toByteArray();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- byteArray = super.toByteArray();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return byteArray;
- }
-
- @Override
- public long[] toLongArray() {
- long stamp = rwLock.tryOptimisticRead();
- long[] longArray = super.toLongArray();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- longArray = super.toLongArray();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return longArray;
- }
-
- @Override
- public void flip(int bitIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.flip(bitIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void flip(int fromIndex, int toIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.flip(fromIndex, toIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void set(int bitIndex, boolean value) {
- long stamp = rwLock.writeLock();
- try {
- super.set(bitIndex, value);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void set(int fromIndex, int toIndex, boolean value) {
- long stamp = rwLock.writeLock();
- try {
- super.set(fromIndex, toIndex, value);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public BitSet get(int fromIndex, int toIndex) {
- long stamp = rwLock.tryOptimisticRead();
- BitSet bitSet = super.get(fromIndex, toIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- bitSet = super.get(fromIndex, toIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return bitSet;
- }
-
- @Override
- public boolean intersects(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- return super.intersects(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void and(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- super.and(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void or(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- super.or(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void xor(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- super.xor(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void andNot(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- super.andNot(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- /**
- * Returns the clone of the internal wrapped {@code BitSet}.
- * This won't be a clone of the {@code ConcurrentBitSet} object.
- *
- * @return a clone of the internal wrapped {@code BitSet}
- */
- @Override
- public Object clone() {
- long stamp = rwLock.tryOptimisticRead();
- RoaringBitSet clone = (RoaringBitSet) super.clone();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- clone = (RoaringBitSet) super.clone();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return clone;
- }
-
- @Override
- public String toString() {
- long stamp = rwLock.tryOptimisticRead();
- String str = super.toString();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- str = super.toString();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return str;
- }
-
- /**
- * This operation is not supported on {@code ConcurrentBitSet}.
- */
- @Override
- public IntStream stream() {
- throw new UnsupportedOperationException("stream is not supported");
- }
-
- public boolean equals(final Object o) {
- long stamp = rwLock.tryOptimisticRead();
- boolean isEqual = super.equals(o);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- isEqual = super.equals(o);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return isEqual;
- }
-
- public int hashCode() {
- long stamp = rwLock.tryOptimisticRead();
- int hashCode = super.hashCode();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- hashCode = super.hashCode();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return hashCode;
- }
-}