This is an automated email from the ASF dual-hosted git repository.
kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 2b5b918a6 Fix problem writing sliced BooleanBuffers as fast-encoding
format (#1522)
2b5b918a6 is described below
commit 2b5b918a6aadb44f14e12e82ce510e86df778105
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Mar 26 03:40:46 2025 +0800
Fix problem writing sliced BooleanBuffers as fast-encoding format (#1522)
## Which issue does this PR close?
Closes #1520.
## Rationale for this change
This is a problem I found when working on
https://github.com/apache/datafusion-comet/pull/1511, the null bits were not
correctly written and caused test failures. This patch is an attempt to fix it.
This patch is only aiming for fixing correctness problems. As
https://github.com/apache/datafusion-comet/pull/1190#discussion_r1911955126
pointed out, the fast BatchWriter may write full data buffer for sliced `Utf8`
arrays, so there's still some performance implications when working with sliced
arrays.
## What changes are included in this PR?
Correctly take slicing indices and length into account when writing
BooleanBuffers. This applies to null bits of all arrays, and the values of
boolean arrays.
## How are these changes tested?
Added a new round-trip test for sliced record batches.
---
native/core/src/execution/shuffle/codec.rs | 41 +++++++++++++++++++++++++++---
1 file changed, 38 insertions(+), 3 deletions(-)
diff --git a/native/core/src/execution/shuffle/codec.rs
b/native/core/src/execution/shuffle/codec.rs
index 4e957ac8a..ff02b933f 100644
--- a/native/core/src/execution/shuffle/codec.rs
+++ b/native/core/src/execution/shuffle/codec.rs
@@ -181,7 +181,7 @@ impl<W: Write> BatchWriter<W> {
// be determined from the data buffer size (length is in bits
rather than bytes)
self.write_all(&arr.len().to_le_bytes())?;
// write data buffer
- self.write_buffer(arr.values().inner())?;
+ self.write_boolean_buffer(arr.values())?;
// write null buffer
self.write_null_buffer(arr.nulls())?;
}
@@ -300,8 +300,7 @@ impl<W: Write> BatchWriter<W> {
// write null buffer length in bits
self.write_all(&buffer.len().to_le_bytes())?;
// write null buffer
- let buffer = buffer.inner();
- self.write_buffer(buffer)?;
+ self.write_boolean_buffer(buffer)?;
} else {
self.inner.write_all(&0_usize.to_le_bytes())?;
}
@@ -315,6 +314,19 @@ impl<W: Write> BatchWriter<W> {
self.inner.write_all(buffer.as_slice())
}
+ fn write_boolean_buffer(&mut self, buffer: &BooleanBuffer) ->
std::io::Result<()> {
+ let inner_buffer = buffer.inner();
+ if buffer.offset() == 0 && buffer.len() == inner_buffer.len() {
+ // Not a sliced buffer, write the inner buffer directly
+ self.write_buffer(inner_buffer)?;
+ } else {
+ // Sliced buffer, create and write the sliced buffer
+ let buffer = buffer.sliced();
+ self.write_buffer(&buffer)?;
+ }
+ Ok(())
+ }
+
pub fn inner(self) -> W {
self.inner
}
@@ -621,6 +633,29 @@ mod test {
assert_eq!(batch, batch2);
}
+ #[test]
+ fn roundtrip_sliced() {
+ let batch = create_batch(8192, true);
+
+ let mut start = 0;
+ let batch_size = 128;
+ while start < batch.num_rows() {
+ let end = (start + batch_size).min(batch.num_rows());
+ let sliced_batch = batch.slice(start, end - start);
+ let buffer = Vec::new();
+ let mut writer = BatchWriter::new(buffer);
+ writer.write_partial_schema(&sliced_batch.schema()).unwrap();
+ writer.write_batch(&sliced_batch).unwrap();
+ let buffer = writer.inner();
+
+ let mut reader = BatchReader::new(&buffer);
+ let batch2 = reader.read_batch().unwrap();
+ assert_eq!(sliced_batch, batch2);
+
+ start = end;
+ }
+ }
+
fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("bool", DataType::Boolean, true),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]