parthchandra commented on code in PR #3003:
URL: https://github.com/apache/datafusion-comet/pull/3003#discussion_r2691558689


##########
native/spark-expr/src/bloom_filter/spark_bloom_filter.rs:
##########
@@ -160,11 +160,76 @@ impl SparkBloomFilter {
     }
 
     pub fn merge_filter(&mut self, other: &[u8]) {
-        assert_eq!(
-            other.len(),
-            self.bits.byte_size(),
-            "Cannot merge SparkBloomFilters with different lengths."
-        );
-        self.bits.merge_bits(other);
+        // Extract bits data if other is in Spark's full serialization format
+        // We need to compute the expected size and extract data before 
borrowing self.bits mutably
+        let expected_bits_size = self.bits.byte_size();
+        const SPARK_HEADER_SIZE: usize = 12; // version (4) + 
num_hash_functions (4) + num_words (4)
+
+        let bits_data = if other.len() >= SPARK_HEADER_SIZE {
+            // Check if this is Spark's serialization format by reading the 
version
+            let version = i32::from_be_bytes([
+                other[0], other[1], other[2], other[3],
+            ]);
+            if version == SPARK_BLOOM_FILTER_VERSION_1 {
+                // This is Spark's full format, parse it to extract bits data
+                let num_words = i32::from_be_bytes([
+                    other[8], other[9], other[10], other[11],
+                ]) as usize;
+                let bits_start = SPARK_HEADER_SIZE;
+                let bits_end = bits_start + (num_words * 8);
+                
+                // Verify the buffer is large enough
+                if bits_end > other.len() {
+                    panic!(
+                        "Cannot merge SparkBloomFilters: buffer too short. 
Expected at least {} bytes ({} words), got {} bytes",
+                        bits_end,
+                        num_words,
+                        other.len()
+                    );
+                }
+                
+                // Check if the incoming bloom filter has compatible size
+                let incoming_bits_size = bits_end - bits_start;
+                if incoming_bits_size != expected_bits_size {
+                    panic!(

Review Comment:
   Can we use `CometError::Internal(String)` instead of `panic!`? (You'll need 
to return a Result)



##########
native/spark-expr/src/bloom_filter/spark_bloom_filter.rs:
##########
@@ -160,11 +160,76 @@ impl SparkBloomFilter {
     }
 
     pub fn merge_filter(&mut self, other: &[u8]) {
-        assert_eq!(
-            other.len(),
-            self.bits.byte_size(),
-            "Cannot merge SparkBloomFilters with different lengths."
-        );
-        self.bits.merge_bits(other);
+        // Extract bits data if other is in Spark's full serialization format
+        // We need to compute the expected size and extract data before 
borrowing self.bits mutably
+        let expected_bits_size = self.bits.byte_size();
+        const SPARK_HEADER_SIZE: usize = 12; // version (4) + 
num_hash_functions (4) + num_words (4)
+
+        let bits_data = if other.len() >= SPARK_HEADER_SIZE {

Review Comment:
   Should this be strictly greater than SPARK_HEADER_SIZE? 



##########
native/spark-expr/src/bloom_filter/spark_bloom_filter.rs:
##########
@@ -160,11 +160,76 @@ impl SparkBloomFilter {
     }
 
     pub fn merge_filter(&mut self, other: &[u8]) {
-        assert_eq!(
-            other.len(),
-            self.bits.byte_size(),
-            "Cannot merge SparkBloomFilters with different lengths."
-        );
-        self.bits.merge_bits(other);
+        // Extract bits data if other is in Spark's full serialization format
+        // We need to compute the expected size and extract data before 
borrowing self.bits mutably
+        let expected_bits_size = self.bits.byte_size();
+        const SPARK_HEADER_SIZE: usize = 12; // version (4) + 
num_hash_functions (4) + num_words (4)
+
+        let bits_data = if other.len() >= SPARK_HEADER_SIZE {
+            // Check if this is Spark's serialization format by reading the 
version
+            let version = i32::from_be_bytes([
+                other[0], other[1], other[2], other[3],
+            ]);
+            if version == SPARK_BLOOM_FILTER_VERSION_1 {

Review Comment:
   Is this sufficient to ensure that this is a spark bloom filter? Isn't there 
a chance the starting 4 bytes of the Comet bloom filter might match the pattern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to