wanglijie95 commented on code in PR #22908:
URL: https://github.com/apache/flink/pull/22908#discussion_r1262051770


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java:
##########
@@ -83,6 +83,13 @@ public int bitSize() {
         return bitLength;
     }
 
+    public byte[] getData() {

Review Comment:
   Maybe `getData` -> `toBytes` or `toByteArray`, and add doc to indicate that 
currently only supports heap.
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java:
##########
@@ -83,6 +83,13 @@ public int bitSize() {
         return bitLength;
     }
 
+    public byte[] getData() {
+        if (memorySegment.isOffHeap()) {
+            throw new IllegalStateException("Only support use heap memory for 
serialization");
+        }

Review Comment:
   This `if` block can repalced by the `checkState`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java:
##########
@@ -135,6 +144,30 @@ public void reset() {
         this.bitSet.clear();
     }
 
+    /**
+     * Merge the specified bloom filter with current bloom filter.
+     *
+     * @param that - bloom filter to merge
+     */
+    public void merge(BloomFilter that) {

Review Comment:
   Is this method needed? I think we only need the `mergeBloomFilterBytes`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java:
##########
@@ -143,4 +176,63 @@ public String toString() {
         output.append(bitSet);
         return output.toString();
     }
+
+    public static byte[] toBytes(BloomFilter filter) {
+        byte[] data = filter.bitSet.getData();
+        int byteSize = data.length;
+        byte[] bytes = new byte[8 + byteSize];
+        UNSAFE.putInt(bytes, BYTE_ARRAY_BASE_OFFSET, filter.numHashFunctions);
+        UNSAFE.putInt(bytes, BYTE_ARRAY_BASE_OFFSET + 4, byteSize);
+        UNSAFE.copyMemory(
+                data, BYTE_ARRAY_BASE_OFFSET, bytes, BYTE_ARRAY_BASE_OFFSET + 
8, byteSize);
+        return bytes;
+    }
+
+    public static BloomFilter fromBytes(byte[] bytes) {

Review Comment:
   Add doc, and indicate that currently only supports heap.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java:
##########
@@ -43,17 +45,24 @@
  */
 public class BloomFilter {
 
+    @SuppressWarnings("restriction")
+    private static final int BYTE_ARRAY_BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
+
     protected BitSet bitSet;
-    protected int expectedEntries;
     protected int numHashFunctions;
 
     public BloomFilter(int expectedEntries, int byteSize) {
         checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
-        this.expectedEntries = expectedEntries;
         this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, 
byteSize << 3);
         this.bitSet = new BitSet(byteSize);
     }
 
+    /** A constructor to support rebuilding the BloomFilter from a serialized 
representation. */
+    private BloomFilter(BitSet bitSet, int numHashFunctions) {

Review Comment:
   check `bitSet` not null, and check `numHashFunctions` > 0



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java:
##########
@@ -168,4 +173,161 @@ public void testHashcodeInput() {
         assertTrue(bloomFilter.testHash(val4));
         assertTrue(bloomFilter.testHash(val5));
     }
+
+    @Test
+    public void testBloomFilterSerDe() {

Review Comment:
   Can we migrate this test to junit5?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java:
##########
@@ -143,4 +176,63 @@ public String toString() {
         output.append(bitSet);
         return output.toString();
     }
+
+    public static byte[] toBytes(BloomFilter filter) {

Review Comment:
   Add doc, and indicate that currently only supports heap.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java:
##########
@@ -143,4 +176,63 @@ public String toString() {
         output.append(bitSet);
         return output.toString();
     }
+
+    public static byte[] toBytes(BloomFilter filter) {
+        byte[] data = filter.bitSet.getData();
+        int byteSize = data.length;
+        byte[] bytes = new byte[8 + byteSize];
+        UNSAFE.putInt(bytes, BYTE_ARRAY_BASE_OFFSET, filter.numHashFunctions);
+        UNSAFE.putInt(bytes, BYTE_ARRAY_BASE_OFFSET + 4, byteSize);
+        UNSAFE.copyMemory(
+                data, BYTE_ARRAY_BASE_OFFSET, bytes, BYTE_ARRAY_BASE_OFFSET + 
8, byteSize);
+        return bytes;
+    }
+
+    public static BloomFilter fromBytes(byte[] bytes) {
+        int numHashFunctions = UNSAFE.getInt(bytes, BYTE_ARRAY_BASE_OFFSET);
+        int byteSize = UNSAFE.getInt(bytes, BYTE_ARRAY_BASE_OFFSET + 4);
+        byte[] data = new byte[byteSize];
+        UNSAFE.copyMemory(
+                bytes, BYTE_ARRAY_BASE_OFFSET + 8, data, 
BYTE_ARRAY_BASE_OFFSET, byteSize);
+
+        BitSet bitSet = new BitSet(byteSize);
+        bitSet.setMemorySegment(MemorySegmentFactory.wrap(data), 0);
+        return new BloomFilter(bitSet, numHashFunctions);
+    }
+
+    public static byte[] mergeBloomFilterBytes(byte[] bf1Bytes, byte[] 
bf2Bytes) {
+        return mergeBloomFilterBytes(bf1Bytes, 0, bf1Bytes.length, bf2Bytes, 
0, bf2Bytes.length);
+    }
+
+    /** Merge the bf2 bytes to bf1 */
+    public static byte[] mergeBloomFilterBytes(

Review Comment:
   can be private



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java:
##########
@@ -143,4 +176,63 @@ public String toString() {
         output.append(bitSet);
         return output.toString();
     }
+
+    public static byte[] toBytes(BloomFilter filter) {
+        byte[] data = filter.bitSet.getData();
+        int byteSize = data.length;
+        byte[] bytes = new byte[8 + byteSize];
+        UNSAFE.putInt(bytes, BYTE_ARRAY_BASE_OFFSET, filter.numHashFunctions);
+        UNSAFE.putInt(bytes, BYTE_ARRAY_BASE_OFFSET + 4, byteSize);
+        UNSAFE.copyMemory(
+                data, BYTE_ARRAY_BASE_OFFSET, bytes, BYTE_ARRAY_BASE_OFFSET + 
8, byteSize);
+        return bytes;
+    }
+
+    public static BloomFilter fromBytes(byte[] bytes) {
+        int numHashFunctions = UNSAFE.getInt(bytes, BYTE_ARRAY_BASE_OFFSET);
+        int byteSize = UNSAFE.getInt(bytes, BYTE_ARRAY_BASE_OFFSET + 4);
+        byte[] data = new byte[byteSize];
+        UNSAFE.copyMemory(
+                bytes, BYTE_ARRAY_BASE_OFFSET + 8, data, 
BYTE_ARRAY_BASE_OFFSET, byteSize);
+
+        BitSet bitSet = new BitSet(byteSize);
+        bitSet.setMemorySegment(MemorySegmentFactory.wrap(data), 0);
+        return new BloomFilter(bitSet, numHashFunctions);
+    }
+
+    public static byte[] mergeBloomFilterBytes(byte[] bf1Bytes, byte[] 
bf2Bytes) {
+        return mergeBloomFilterBytes(bf1Bytes, 0, bf1Bytes.length, bf2Bytes, 
0, bf2Bytes.length);
+    }
+
+    /** Merge the bf2 bytes to bf1 */
+    public static byte[] mergeBloomFilterBytes(

Review Comment:
   Maybe named `mergeSerializedBloomFilters` ? And add doc to indicate that 
`bf1Bytes` will be changed after the merge.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java:
##########
@@ -168,4 +173,161 @@ public void testHashcodeInput() {
         assertTrue(bloomFilter.testHash(val4));
         assertTrue(bloomFilter.testHash(val5));
     }
+
+    @Test
+    public void testBloomFilterSerDe() {

Review Comment:
   testBloomFilterSerializationAndDeserialization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to