clintropolis commented on a change in pull request #6397: Adds bloom filter 
aggregator to 'druid-bloom-filters' extension
URL: https://github.com/apache/incubator-druid/pull/6397#discussion_r247308437
 
 

 ##########
 File path: 
extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/filter/BloomKFilter.java
 ##########
 @@ -210,6 +218,214 @@ public static void mergeBloomFilterBytes(
     }
   }
 
+  public static void serialize(ByteBuffer out, BloomKFilter bloomFilter)
+  {
+    serialize(out, out.position(), bloomFilter);
+  }
+
+  /**
+   * Serialize a bloom filter to a ByteBuffer
+   *
+   * @param out         output buffer to write to
+   * @param position    output buffer position
+   * @param bloomFilter BloomKFilter that needs to be seralized
+   */
+  public static void serialize(ByteBuffer out, int position, BloomKFilter 
bloomFilter)
+  {
+    /**
+     * Serialized BloomKFilter format:
+     * 1 byte for the number of hash functions.
+     * 1 big endian int(That is how OutputStream works) for the number of 
longs in the bitset
+     * big endina longs in the BloomKFilter bitset
+     */
+    ByteBuffer view = out.duplicate().order(ByteOrder.BIG_ENDIAN);
+    view.position(position);
+    view.put((byte) bloomFilter.k);
+    view.putInt(bloomFilter.getBitSet().length);
+    for (long value : bloomFilter.getBitSet()) {
+      view.putLong(value);
+    }
+  }
+
+  public static BloomKFilter deserialize(ByteBuffer in) throws IOException
+  {
+    return deserialize(in, in.position());
+  }
+
+  /**
+   * Deserialize a bloom filter
+   * Read a byte buffer, which was written by {@linkplain 
#serialize(OutputStream, BloomKFilter)} or
+   * {@linkplain #serialize(ByteBuffer, int, BloomKFilter)}
+   * into a {@code BloomKFilter}
+   *
+   * @param in input ByteBuffer
+   *
+   * @return deserialized BloomKFilter
+   */
+  public static BloomKFilter deserialize(ByteBuffer in, int position) throws 
IOException
+  {
+    if (in == null) {
+      throw new IOException("Input stream is null");
+    }
+
+    try {
+      ByteBuffer dataBuffer = in.duplicate().order(ByteOrder.BIG_ENDIAN);
+      dataBuffer.position(position);
+      int numHashFunc = dataBuffer.get();
+      int bitsetArrayLen = dataBuffer.getInt();
+      long[] data = new long[bitsetArrayLen];
+      for (int i = 0; i < bitsetArrayLen; i++) {
+        data[i] = dataBuffer.getLong();
+      }
+      return new BloomKFilter(data, numHashFunc);
+    }
+    catch (RuntimeException e) {
+      IOException io = new IOException("Unable to deserialize BloomKFilter");
+      io.initCause(e);
+      throw io;
+    }
+  }
+
+  /**
+   * Merges BloomKFilter bf2 into bf1.
+   * Assumes 2 BloomKFilters with the same size/hash functions are serialized 
to ByteBuffers
+   *
+   * @param bf1Buffer
+   * @param bf1Start
+   * @param bf2Buffer
+   * @param bf2Start
+   */
+  public static void mergeBloomFilterByteBuffers(
+      ByteBuffer bf1Buffer,
+      int bf1Start,
+      ByteBuffer bf2Buffer,
+      int bf2Start
+  )
+  {
+    ByteBuffer view1 = bf1Buffer.duplicate().order(ByteOrder.BIG_ENDIAN);
+    ByteBuffer view2 = bf2Buffer.duplicate().order(ByteOrder.BIG_ENDIAN);
+    final int bf1Length = START_OF_SERIALIZED_LONGS + (view1.getInt(1 + 
bf1Start) * Long.BYTES);
+    final int bf2Length = START_OF_SERIALIZED_LONGS + (view2.getInt(1 + 
bf2Start) * Long.BYTES);
+
+    if (bf1Length != bf2Length) {
+      throw new IllegalArgumentException("bf1Length " + bf1Length + " does not 
match bf2Length " + bf2Length);
+    }
+
+    // Validation on the bitset size/3 hash functions.
+    for (int idx = 0; idx < START_OF_SERIALIZED_LONGS; ++idx) {
+      if (view1.get(bf1Start + idx) != view2.get(bf2Start + idx)) {
+        throw new IllegalArgumentException("bf1 NumHashFunctions/NumBits does 
not match bf2");
+      }
+    }
+
+    // Just bitwise-OR the bits together - size/# functions should be the same,
+    // rest of the data is serialized long values for the bitset which are 
supposed to be bitwise-ORed.
+    for (int idx = START_OF_SERIALIZED_LONGS; idx < bf1Length; ++idx) {
+      final int pos1 = bf1Start + idx;
+      final int pos2 = bf2Start + idx;
+      view1.put(pos1, (byte) (view1.get(pos1) | view2.get(pos2)));
+    }
+  }
+
+  /**
+   * Caculate size in bytes of a BloomKFilter for a given number of entries
 
 Review comment:
   Fixed (and a few other occurrences)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to