This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new edd0076bd83 [fix][misc] Make ConcurrentBitSet thread safe (#22361)
edd0076bd83 is described below

commit edd0076bd83f01a5fcbe81c8396667014f0fc36e
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Mar 27 09:16:22 2024 -0700

    [fix][misc] Make ConcurrentBitSet thread safe (#22361)
---
 .../common/util/collections/ConcurrentBitSet.java  | 363 +++++++++++++++++++--
 1 file changed, 331 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
index 23842fe5b55..a37628cb300 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -20,12 +20,13 @@ package org.apache.pulsar.common.util.collections;
 
 import java.util.BitSet;
 import java.util.concurrent.locks.StampedLock;
-import lombok.EqualsAndHashCode;
+import java.util.stream.IntStream;
 
 /**
- * Safe multithreaded version of {@code BitSet}.
+ * A {@code BitSet} that is protected by a {@code StampedLock} to provide 
thread-safe access.
+ * The {@link #length()} method is not thread safe and is not overridden 
because StampedLock is not reentrant.
+ * Use the {@link #safeLength()} method to get the length of the bit set in a 
thread-safe manner.
  */
-@EqualsAndHashCode(callSuper = true)
 public class ConcurrentBitSet extends BitSet {
 
     private static final long serialVersionUID = 1L;
@@ -39,10 +40,8 @@ public class ConcurrentBitSet extends BitSet {
      * Creates a bit set whose initial size is large enough to explicitly 
represent bits with indices in the range
      * {@code 0} through {@code nbits-1}. All bits are initially {@code false}.
      *
-     * @param nbits
-     *            the initial size of the bit set
-     * @throws NegativeArraySizeException
-     *             if the specified initial size is negative
+     * @param nbits the initial size of the bit set
+     * @throws NegativeArraySizeException if the specified initial size is 
negative
      */
     public ConcurrentBitSet(int nbits) {
         super(nbits);
@@ -65,105 +64,405 @@ public class ConcurrentBitSet extends BitSet {
 
     @Override
     public void set(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear() {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear();
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public int nextSetBit(int fromIndex) {
         long stamp = rwLock.tryOptimisticRead();
-        super.set(bitIndex);
+        int nextSetBit = super.nextSetBit(fromIndex);
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                super.set(bitIndex);
+                nextSetBit = super.nextSetBit(fromIndex);
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
+        return nextSetBit;
     }
 
     @Override
-    public void set(int fromIndex, int toIndex) {
+    public int nextClearBit(int fromIndex) {
         long stamp = rwLock.tryOptimisticRead();
-        super.set(fromIndex, toIndex);
+        int nextClearBit = super.nextClearBit(fromIndex);
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                super.set(fromIndex, toIndex);
+                nextClearBit = super.nextClearBit(fromIndex);
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
+        return nextClearBit;
     }
 
     @Override
-    public int nextSetBit(int fromIndex) {
+    public int previousSetBit(int fromIndex) {
         long stamp = rwLock.tryOptimisticRead();
-        int bit = super.nextSetBit(fromIndex);
+        int previousSetBit = super.previousSetBit(fromIndex);
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                bit = super.nextSetBit(fromIndex);
+                previousSetBit = super.previousSetBit(fromIndex);
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return bit;
+        return previousSetBit;
     }
 
     @Override
-    public int nextClearBit(int fromIndex) {
+    public int previousClearBit(int fromIndex) {
         long stamp = rwLock.tryOptimisticRead();
-        int bit = super.nextClearBit(fromIndex);
+        int previousClearBit = super.previousClearBit(fromIndex);
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                bit = super.nextClearBit(fromIndex);
+                previousClearBit = super.previousClearBit(fromIndex);
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return bit;
+        return previousClearBit;
     }
 
     @Override
-    public int previousSetBit(int fromIndex) {
+    public boolean isEmpty() {
         long stamp = rwLock.tryOptimisticRead();
-        int bit = super.previousSetBit(fromIndex);
+        boolean isEmpty = super.isEmpty();
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                bit = super.previousSetBit(fromIndex);
+                isEmpty = super.isEmpty();
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return bit;
+        return isEmpty;
     }
 
     @Override
-    public int previousClearBit(int fromIndex) {
+    public int cardinality() {
         long stamp = rwLock.tryOptimisticRead();
-        int bit = super.previousClearBit(fromIndex);
+        int cardinality = super.cardinality();
         if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                bit = super.previousClearBit(fromIndex);
+                cardinality = super.cardinality();
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return bit;
+        return cardinality;
     }
 
     @Override
-    public boolean isEmpty() {
+    public int size() {
         long stamp = rwLock.tryOptimisticRead();
-        boolean isEmpty = super.isEmpty();
+        int size = super.size();
         if (!rwLock.validate(stamp)) {
             // Fallback to read lock
             stamp = rwLock.readLock();
             try {
-                isEmpty = super.isEmpty();
+                size = super.size();
             } finally {
                 rwLock.unlockRead(stamp);
             }
         }
-        return isEmpty;
+        return size;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        long stamp = rwLock.tryOptimisticRead();
+        byte[] byteArray = super.toByteArray();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                byteArray = super.toByteArray();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return byteArray;
+    }
+
+    @Override
+    public long[] toLongArray() {
+        long stamp = rwLock.tryOptimisticRead();
+        long[] longArray = super.toLongArray();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                longArray = super.toLongArray();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return longArray;
+    }
+
+    @Override
+    public void flip(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.flip(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void flip(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.flip(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int bitIndex, boolean value) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(bitIndex, value);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int fromIndex, int toIndex, boolean value) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(fromIndex, toIndex, value);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public BitSet get(int fromIndex, int toIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        BitSet bitSet = super.get(fromIndex, toIndex);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                bitSet = super.get(fromIndex, toIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return bitSet;
+    }
+
+    /**
+     * Thread-safe version of {@code length()}.
+     * StampedLock is not reentrant and that's why the length() method is not 
overridden. Overriding length() method
+     * would require to use a reentrant lock which would be less performant.
+     *
+     * @return length of the bit set
+     */
+    public int safeLength() {
+        long stamp = rwLock.tryOptimisticRead();
+        int length = super.length();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                length = super.length();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return length;
+    }
+
+    @Override
+    public boolean intersects(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            return super.intersects(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void and(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.and(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void or(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.or(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void xor(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.xor(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void andNot(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.andNot(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    /**
+     * Returns the clone of the internal wrapped {@code BitSet}.
+     * This won't be a clone of the {@code ConcurrentBitSet} object.
+     *
+     * @return a clone of the internal wrapped {@code BitSet}
+     */
+    @Override
+    public Object clone() {
+        long stamp = rwLock.tryOptimisticRead();
+        BitSet clonedBitSet = (BitSet) super.clone();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                clonedBitSet = (BitSet) super.clone();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return clonedBitSet;
+    }
+
+    @Override
+    public String toString() {
+        long stamp = rwLock.tryOptimisticRead();
+        String str = super.toString();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                str = super.toString();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return str;
+    }
+
+    /**
+     * This operation is not supported on {@code ConcurrentBitSet}.
+     */
+    @Override
+    public IntStream stream() {
+        throw new UnsupportedOperationException("stream is not supported");
+    }
+
+    public boolean equals(final Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof ConcurrentBitSet)) {
+            return false;
+        }
+        long stamp = rwLock.tryOptimisticRead();
+        boolean isEqual = super.equals(o);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                isEqual = super.equals(o);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return isEqual;
+    }
+
+    public int hashCode() {
+        long stamp = rwLock.tryOptimisticRead();
+        int hashCode = super.hashCode();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                hashCode = super.hashCode();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return hashCode;
     }
 }

Reply via email to