This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f9f66181fa [improve][broker] Support values up to 2^32 in 
ConcurrentBitmapSortedLongPairSet (#23878)
0f9f66181fa is described below

commit 0f9f66181fade7b5163372591d2b299b6ba31780
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 23 16:02:45 2025 +0200

    [improve][broker] Support values up to 2^32 in 
ConcurrentBitmapSortedLongPairSet (#23878)
---
 .../utils/ConcurrentBitmapSortedLongPairSet.java   | 14 +++++++---
 .../ConcurrentBitmapSortedLongPairSetTest.java     | 30 +++++++++++++++++++---
 2 files changed, 37 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
index 7a4126fedec..70437d07dbe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.utils;
 
-import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
@@ -29,8 +28,13 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
+/**
+ * A concurrent set of pairs of longs.
+ * The right side of the value supports unsigned values up to 2^32.
+ */
 public class ConcurrentBitmapSortedLongPairSet {
 
     private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
@@ -139,10 +143,12 @@ public class ConcurrentBitmapSortedLongPairSet {
         lock.readLock().lock();
         try {
             for (Map.Entry<Long, RoaringBitmap> entry : map.entrySet()) {
-                Iterator<Integer> iterator = 
entry.getValue().stream().iterator();
+                PeekableIntIterator intIterator = 
entry.getValue().getIntIterator();
                 boolean continueProcessing = true;
-                while (continueProcessing && iterator.hasNext()) {
-                    T item = longPairConverter.apply(entry.getKey(), 
iterator.next());
+                while (continueProcessing && intIterator.hasNext()) {
+                    // RoaringBitmap encodes values as unsigned 32-bit 
integers internally, it's necessary to use
+                    // Integer.toUnsignedLong to convert them to unsigned long 
values
+                    T item = longPairConverter.apply(entry.getKey(), 
Integer.toUnsignedLong(intIterator.next()));
                     continueProcessing = itemProcessor.process(item);
                 }
                 if (!continueProcessing) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
index 5f8f13288cf..34f971e8841 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
@@ -18,18 +18,19 @@
  */
 package org.apache.pulsar.utils;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
-import lombok.Cleanup;
-import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
-import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import lombok.Cleanup;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
+import org.testng.annotations.Test;
 
 @Test(groups = "utils")
 public class ConcurrentBitmapSortedLongPairSetTest {
@@ -204,4 +205,27 @@ public class ConcurrentBitmapSortedLongPairSetTest {
 
         assertEquals(set.size(), N * nThreads);
     }
+
+    @Test
+    public void testValueLargerThanIntegerMAX_VALUE() {
+        ConcurrentBitmapSortedLongPairSet set = new 
ConcurrentBitmapSortedLongPairSet();
+        long baseValue = Integer.MAX_VALUE;
+        List<Long> addedValues = new ArrayList<>();
+        int items = 10;
+        for (int i = 0; i < items; i++) {
+            long value = baseValue + i;
+            set.add(1, value);
+            addedValues.add(value);
+        }
+        assertEquals(set.size(), items);
+        List<Long> values = new ArrayList<>();
+        set.processItems((item1, item2) -> {
+            assertEquals(item1, 1);
+            return item2;
+        }, (value) -> {
+            values.add(value);
+            return true;
+        });
+        assertThat(values).containsExactlyElementsOf(addedValues);
+    }
 }

Reply via email to