This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 14c4b23  [FLINK-12730][runtime] Unify BitSet implementations in 
flink-runtime
14c4b23 is described below

commit 14c4b23ca8744f83864a8c91ff0c1b88af52e532
Author: liyafan82 <[email protected]>
AuthorDate: Tue Jul 9 19:08:08 2019 +0800

    [FLINK-12730][runtime] Unify BitSet implementations in flink-runtime
    
    This closes #8613.
---
 .../flink/runtime/operators/util/BitSet.java       | 15 ++--
 .../flink/runtime/operators/util/BloomFilter.java  | 79 ----------------------
 .../flink/runtime/operators/util/BitSetTest.java   | 19 +++++-
 .../runtime/operators/util/BloomFilterTest.java    |  7 +-
 4 files changed, 27 insertions(+), 93 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
index b6e1e07..df13caf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
@@ -32,7 +32,6 @@ public class BitSet {
        // The BitSet bit size.
        private int bitLength;
 
-       private final int BYTE_POSITION_MASK = 0xfffffff8;
        private final int BYTE_INDEX_MASK = 0x00000007;
 
        public BitSet(int byteSize) {
@@ -58,7 +57,7 @@ public class BitSet {
        public void set(int index) {
                Preconditions.checkArgument(index < bitLength && index >= 0);
 
-               int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
+               int byteIndex = index >>> 3;
                byte current = memorySegment.get(offset + byteIndex);
                current |= (1 << (index & BYTE_INDEX_MASK));
                memorySegment.put(offset + byteIndex, current);
@@ -73,7 +72,7 @@ public class BitSet {
        public boolean get(int index) {
                Preconditions.checkArgument(index < bitLength && index >= 0);
                
-               int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
+               int byteIndex = index >>> 3;
                byte current = memorySegment.get(offset + byteIndex);
                return (current & (1 << (index & BYTE_INDEX_MASK))) != 0;
        }
@@ -89,8 +88,14 @@ public class BitSet {
         * Clear the bit set.
         */
        public void clear() {
-               for (int i = 0; i < byteLength; i++) {
-                       memorySegment.put(offset + i, (byte) 0);
+               int index = 0;
+               while (index + 8 <= byteLength) {
+                       memorySegment.putLong(offset + index, 0L);
+                       index += 8;
+               }
+               while (index < byteLength) {
+                       memorySegment.put(offset + index, (byte) 0);
+                       index += 1;
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
index 5f09391..5abf5a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
@@ -143,83 +143,4 @@ public class BloomFilter {
                output.append(bitSet);
                return output.toString();
        }
-       
-       /**
-        * Bare metal bit set implementation. For performance reasons, this 
implementation does not check
-        * for index bounds nor expand the bit set size if the specified index 
is greater than the size.
-        */
-       public class BitSet {
-               private MemorySegment memorySegment;
-               // MemorySegment byte array offset.
-               private int offset;
-               // MemorySegment byte size.
-               private int length;
-               private final int LONG_POSITION_MASK = 0xffffffc0;
-               
-               public BitSet(int byteSize) {
-                       checkArgument(byteSize > 0, "bits size should be 
greater than 0.");
-                       checkArgument(byteSize << 29 == 0, "bytes size should 
be integral multiple of long size(8 Bytes).");
-                       this.length = byteSize;
-               }
-               
-               public void setMemorySegment(MemorySegment memorySegment, int 
offset) {
-                       this.memorySegment = memorySegment;
-                       this.offset = offset;
-               }
-               
-               /**
-                * Sets the bit at specified index.
-                *
-                * @param index - position
-                */
-               public void set(int index) {
-                       int longIndex = (index & LONG_POSITION_MASK) >>> 3;
-                       long current = memorySegment.getLong(offset + 
longIndex);
-                       current |= (1L << index);
-                       memorySegment.putLong(offset + longIndex, current);
-               }
-               
-               /**
-                * Returns true if the bit is set in the specified index.
-                *
-                * @param index - position
-                * @return - value at the bit position
-                */
-               public boolean get(int index) {
-                       int longIndex = (index & LONG_POSITION_MASK) >>> 3;
-                       long current = memorySegment.getLong(offset + 
longIndex);
-                       return (current & (1L << index)) != 0;
-               }
-               
-               /**
-                * Number of bits
-                */
-               public int bitSize() {
-                       return length << 3;
-               }
-               
-               public MemorySegment getMemorySegment() {
-                       return this.memorySegment;
-               }
-               
-               /**
-                * Clear the bit set.
-                */
-               public void clear() {
-                       long zeroValue = 0L;
-                       for (int i = 0; i < (length / 8); i++) {
-                               memorySegment.putLong(offset + i * 8, 
zeroValue);
-                       }
-               }
-               
-               @Override
-               public String toString() {
-                       StringBuilder output = new StringBuilder();
-                       output.append("BitSet:\n");
-                       
output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
-                       output.append("\tOffset:").append(offset).append("\n");
-                       output.append("\tLength:").append(length).append("\n");
-                       return output.toString();
-               }
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
index ec8ae2b..dd1890f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
@@ -21,16 +21,24 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class BitSetTest {
 
        private BitSet bitSet;
-       int byteSize = 1024;
-       MemorySegment memorySegment = 
MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+       int byteSize;
+       MemorySegment memorySegment;
+
+       public BitSetTest(int byteSize) {
+               this.byteSize = byteSize;
+               memorySegment = 
MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+       }
 
        @Before
        public void init() {
@@ -67,7 +75,7 @@ public class BitSetTest {
        @Test
        public void testSetValues() {
                int bitSize = bitSet.bitSize();
-               assertEquals(bitSize, 8 * 1024);
+               assertEquals(bitSize, 8 * byteSize);
                for (int i = 0; i < bitSize; i++) {
                        assertFalse(bitSet.get(i));
                        if (i % 2 == 0) {
@@ -83,4 +91,9 @@ public class BitSetTest {
                        }
                }
        }
+
+       @Parameterized.Parameters(name = "byte size = {0}")
+       public static Object[] getByteSize() {
+               return new Integer[]{1000, 1024, 2019};
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
index 256d4bc..78d0d62 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -63,12 +63,7 @@ public class BloomFilterTest {
        public void testBloomFilterArguments4() {
                new BloomFilter(1024, 0);
        }
-       
-       @Test(expected = IllegalArgumentException.class)
-       public void testBloomFilterArguments5() {
-               new BloomFilter(1024, 21);
-       }
-       
+
        @Test
        public void testBloomNumBits() {
                assertEquals(0, BloomFilter.optimalNumOfBits(0, 0));

Reply via email to