merlimat commented on a change in pull request #3818: [pulsar-common] add open 
Concurrent LongPair RangeSet
URL: https://github.com/apache/pulsar/pull/3818#discussion_r285224145
 
 

 ##########
 File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
 ##########
 @@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.StampedLock;
+
+import 
org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
+import 
org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeProcessor;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+
+/**
+ * A Concurrent set comprising zero or more ranges of type {@link LongPair}. 
This can be alternative of
+ * {@link com.google.common.collect.RangeSet} and can be used if {@code range} 
type is {@link LongPair} </br>
+ * 
+ * <pre>
+ *  
+ * Usage:
+ * a. This can be used if one doesn't want to create object for every new 
inserted {@code range}
+ * b. It creates {@link BitSet} for every unique first-key of the range. 
+ * So, this rangeSet is not suitable for large number of unique keys.
+ * </pre>
+ * 
+ *
+ */
+public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> 
implements LongPairRangeSet<T> {
+
+    protected final NavigableMap<Long, BitSet> rangeBitSetMap = new 
ConcurrentSkipListMap<>();
+    private boolean threadSafe = true;
+    private final int bitSetSize;
+    private final LongPairConsumer<T> consumer;
+
+    // caching place-holder for cpu-optimization to avoid calculating ranges 
again
+    private volatile int cachedSize = 0;
+    private volatile String cachedToString = "[]";
+    private volatile boolean updatedAfterCached = true;
+
+    public ConcurrentOpenLongPairRangeSet(LongPairConsumer<T> consumer) {
+        this(1024, true, consumer);
+    }
+
+    public ConcurrentOpenLongPairRangeSet(int size, LongPairConsumer<T> 
consumer) {
+        this(size, true, consumer);
+    }
+
+    public ConcurrentOpenLongPairRangeSet(int size, boolean threadSafe, 
LongPairConsumer<T> consumer) {
+        this.threadSafe = threadSafe;
+        this.bitSetSize = size;
+        this.consumer = consumer;
+    }
+
+    class ConcurrentBitSet extends BitSet {
+        private static final long serialVersionUID = 1L;
+        private final StampedLock rwLock = new StampedLock();
+
+        /**
+         * Creates a bit set whose initial size is large enough to explicitly 
represent bits with indices in the range
+         * {@code 0} through {@code nbits-1}. All bits are initially {@code 
false}.
+         *
+         * @param nbits
+         *            the initial size of the bit set
+         * @throws NegativeArraySizeException
+         *             if the specified initial size is negative
+         */
+        public ConcurrentBitSet(int nbits) {
+            super(nbits);
+        }
+
+        @Override
+        public boolean get(int bitIndex) {
+            return super.get(bitIndex);
+        }
+
+        @Override
+        public void set(int bitIndex) {
+            long stamp = rwLock.writeLock();
+            try {
+                super.set(bitIndex);
+            } finally {
+                rwLock.unlockWrite(stamp);
+            }
+        }
+
+        @Override
+        public void set(int fromIndex, int toIndex) {
+            long stamp = rwLock.writeLock();
+            try {
+                super.set(fromIndex, toIndex);
+            } finally {
+                rwLock.unlockWrite(stamp);
+            }
+        }
+
+        @Override
+        public int nextSetBit(int fromIndex) {
+            long stamp = rwLock.readLock();
+            try {
+                return super.nextSetBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+
+        @Override
+        public int nextClearBit(int fromIndex) {
+            long stamp = rwLock.readLock();
+            try {
+                return super.nextClearBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
 
 Review comment:
   Using the stamped lock, allow for less expensive read path. eg: 
   
   ```java
   long stamp = rwLock.tryOptimisticRead();
   int bit = super.nextClearBit(fromIndex);
   if (!rwLock.validate(stamp)) {
       stamp = rwLock.readLock();
       try {
          bit = super.nextClearBit(fromIndex);
       } finally {
          rwLock.unlockRead(stamp);
       }
   }
   return bit;
   ```
   
   The idea is to only fallback to read lock when there's a write happening at 
the same time, otherwise it's just 2 volatile reads. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to