This is an automated email from the ASF dual-hosted git repository.

kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b5b918a6 Fix problem writing sliced BooleanBuffers as fast-encoding 
format (#1522)
2b5b918a6 is described below

commit 2b5b918a6aadb44f14e12e82ce510e86df778105
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Mar 26 03:40:46 2025 +0800

    Fix problem writing sliced BooleanBuffers as fast-encoding format (#1522)
    
    ## Which issue does this PR close?
    
    Closes #1520.
    
    ## Rationale for this change
    
    This is a problem I found when working on 
https://github.com/apache/datafusion-comet/pull/1511, the null bits were not 
correctly written and caused test failures. This patch is an attempt to fix it.
    
    This patch is only aiming for fixing correctness problems. As 
https://github.com/apache/datafusion-comet/pull/1190#discussion_r1911955126 
pointed out, the fast BatchWriter may write full data buffer for sliced `Utf8` 
arrays, so there's still some performance implications when working with sliced 
arrays.
    
    ## What changes are included in this PR?
    
    Correctly take slicing indices and length into account when writing 
BooleanBuffers. This applies to null bits of all arrays, and the values of 
boolean arrays.
    
    ## How are these changes tested?
    
    Added a new round-trip test for sliced record batches.
---
 native/core/src/execution/shuffle/codec.rs | 41 +++++++++++++++++++++++++++---
 1 file changed, 38 insertions(+), 3 deletions(-)

diff --git a/native/core/src/execution/shuffle/codec.rs 
b/native/core/src/execution/shuffle/codec.rs
index 4e957ac8a..ff02b933f 100644
--- a/native/core/src/execution/shuffle/codec.rs
+++ b/native/core/src/execution/shuffle/codec.rs
@@ -181,7 +181,7 @@ impl<W: Write> BatchWriter<W> {
                 // be determined from the data buffer size (length is in bits 
rather than bytes)
                 self.write_all(&arr.len().to_le_bytes())?;
                 // write data buffer
-                self.write_buffer(arr.values().inner())?;
+                self.write_boolean_buffer(arr.values())?;
                 // write null buffer
                 self.write_null_buffer(arr.nulls())?;
             }
@@ -300,8 +300,7 @@ impl<W: Write> BatchWriter<W> {
             // write null buffer length in bits
             self.write_all(&buffer.len().to_le_bytes())?;
             // write null buffer
-            let buffer = buffer.inner();
-            self.write_buffer(buffer)?;
+            self.write_boolean_buffer(buffer)?;
         } else {
             self.inner.write_all(&0_usize.to_le_bytes())?;
         }
@@ -315,6 +314,19 @@ impl<W: Write> BatchWriter<W> {
         self.inner.write_all(buffer.as_slice())
     }
 
+    fn write_boolean_buffer(&mut self, buffer: &BooleanBuffer) -> 
std::io::Result<()> {
+        let inner_buffer = buffer.inner();
+        if buffer.offset() == 0 && buffer.len() == inner_buffer.len() {
+            // Not a sliced buffer, write the inner buffer directly
+            self.write_buffer(inner_buffer)?;
+        } else {
+            // Sliced buffer, create and write the sliced buffer
+            let buffer = buffer.sliced();
+            self.write_buffer(&buffer)?;
+        }
+        Ok(())
+    }
+
     pub fn inner(self) -> W {
         self.inner
     }
@@ -621,6 +633,29 @@ mod test {
         assert_eq!(batch, batch2);
     }
 
+    #[test]
+    fn roundtrip_sliced() {
+        let batch = create_batch(8192, true);
+
+        let mut start = 0;
+        let batch_size = 128;
+        while start < batch.num_rows() {
+            let end = (start + batch_size).min(batch.num_rows());
+            let sliced_batch = batch.slice(start, end - start);
+            let buffer = Vec::new();
+            let mut writer = BatchWriter::new(buffer);
+            writer.write_partial_schema(&sliced_batch.schema()).unwrap();
+            writer.write_batch(&sliced_batch).unwrap();
+            let buffer = writer.inner();
+
+            let mut reader = BatchReader::new(&buffer);
+            let batch2 = reader.read_batch().unwrap();
+            assert_eq!(sliced_batch, batch2);
+
+            start = end;
+        }
+    }
+
     fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
         let schema = Arc::new(Schema::new(vec![
             Field::new("bool", DataType::Boolean, true),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to