This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9f66181fa [improve][broker] Support values up to 2^32 in
ConcurrentBitmapSortedLongPairSet (#23878)
0f9f66181fa is described below
commit 0f9f66181fade7b5163372591d2b299b6ba31780
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 23 16:02:45 2025 +0200
[improve][broker] Support values up to 2^32 in
ConcurrentBitmapSortedLongPairSet (#23878)
---
.../utils/ConcurrentBitmapSortedLongPairSet.java | 14 +++++++---
.../ConcurrentBitmapSortedLongPairSetTest.java | 30 +++++++++++++++++++---
2 files changed, 37 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
index 7a4126fedec..70437d07dbe 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.utils;
-import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
@@ -29,8 +28,13 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.RoaringBitmap;
+/**
+ * A concurrent set of pairs of longs.
+ * The right side of the value supports unsigned values up to 2^32.
+ */
public class ConcurrentBitmapSortedLongPairSet {
private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
@@ -139,10 +143,12 @@ public class ConcurrentBitmapSortedLongPairSet {
lock.readLock().lock();
try {
for (Map.Entry<Long, RoaringBitmap> entry : map.entrySet()) {
- Iterator<Integer> iterator =
entry.getValue().stream().iterator();
+ PeekableIntIterator intIterator =
entry.getValue().getIntIterator();
boolean continueProcessing = true;
- while (continueProcessing && iterator.hasNext()) {
- T item = longPairConverter.apply(entry.getKey(),
iterator.next());
+ while (continueProcessing && intIterator.hasNext()) {
+ // RoaringBitmap encodes values as unsigned 32-bit
integers internally, it's necessary to use
+ // Integer.toUnsignedLong to convert them to unsigned long
values
+ T item = longPairConverter.apply(entry.getKey(),
Integer.toUnsignedLong(intIterator.next()));
continueProcessing = itemProcessor.process(item);
}
if (!continueProcessing) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
index 5f8f13288cf..34f971e8841 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
@@ -18,18 +18,19 @@
*/
package org.apache.pulsar.utils;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-import lombok.Cleanup;
-import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
-import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import lombok.Cleanup;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
+import org.testng.annotations.Test;
@Test(groups = "utils")
public class ConcurrentBitmapSortedLongPairSetTest {
@@ -204,4 +205,27 @@ public class ConcurrentBitmapSortedLongPairSetTest {
assertEquals(set.size(), N * nThreads);
}
+
+ @Test
+ public void testValueLargerThanIntegerMAX_VALUE() {
+ ConcurrentBitmapSortedLongPairSet set = new
ConcurrentBitmapSortedLongPairSet();
+ long baseValue = Integer.MAX_VALUE;
+ List<Long> addedValues = new ArrayList<>();
+ int items = 10;
+ for (int i = 0; i < items; i++) {
+ long value = baseValue + i;
+ set.add(1, value);
+ addedValues.add(value);
+ }
+ assertEquals(set.size(), items);
+ List<Long> values = new ArrayList<>();
+ set.processItems((item1, item2) -> {
+ assertEquals(item1, 1);
+ return item2;
+ }, (value) -> {
+ values.add(value);
+ return true;
+ });
+ assertThat(values).containsExactlyElementsOf(addedValues);
+ }
}