tustvold commented on a change in pull request #1782: URL: https://github.com/apache/arrow-datafusion/pull/1782#discussion_r801534654
########## File path: datafusion/src/row/mod.rs ########## @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An implementation of Row backed by raw bytes +//! +//! Each tuple consists of up to three parts: [null bit set] [values] [var length data] +//! +//! The null bit set is used for null tracking and is aligned to 1-byte. It stores +//! one bit per field. +//! +//! In the region of the values, we store the fields in the order they are defined in the schema. +//! - For fixed-length, sequential access fields, we store them directly. +//! E.g., 4 bytes for int and 1 byte for bool. +//! - For fixed-length, update often fields, we store one 8-byte word per field. +//! - For fields of non-primitive or variable-length types, +//! we append their actual content to the end of the var length region and +//! store their offset relative to row base and their length, packed into an 8-byte word. + +use arrow::datatypes::{DataType, Schema}; +use std::sync::Arc; + +mod bitmap; +mod reader; +mod writer; + +const UTF8_DEFAULT_SIZE: usize = 20; +const BINARY_DEFAULT_SIZE: usize = 100; + +/// Get relative offsets for each field and total width for values +fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) { + let mut offsets = vec![]; + let mut offset = null_width; + for f in schema.fields() { + offsets.push(offset); + offset += type_width(f.data_type()); + } + (offsets, offset - null_width) +} + +fn supported_type(dt: &DataType) -> bool { + use DataType::*; + matches!( + dt, + Boolean + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Int8 + | Int16 + | Int32 + | Int64 + | Float32 + | Float64 + | Date32 + | Date64 + | Utf8 + | Binary + ) +} + +fn var_length(dt: &DataType) -> bool { + use DataType::*; + matches!(dt, Utf8 | Binary) +} + +fn type_width(dt: &DataType) -> usize { + use DataType::*; + if var_length(dt) { + return 8; + } + match dt { + Boolean | UInt8 | Int8 => 1, + UInt16 | Int16 => 2, + UInt32 | Int32 | Float32 | Date32 => 4, + UInt64 | Int64 | Float64 | Date64 => 8, + _ => unreachable!(), + } +} + +fn estimate_row_width(null_width: usize, schema: &Arc<Schema>) -> usize { + let mut width = null_width; + for f in schema.fields() { + width += type_width(f.data_type()); + match f.data_type() { + DataType::Utf8 => width += UTF8_DEFAULT_SIZE, + DataType::Binary => width += BINARY_DEFAULT_SIZE, + _ => {} + } + } + (width.saturating_add(7) / 8) * 8 +} + +fn fixed_size(schema: &Arc<Schema>) -> bool { + schema.fields().iter().all(|f| !var_length(f.data_type())) +} + +fn supported(schema: &Arc<Schema>) -> bool { + schema + .fields() + .iter() + .all(|f| supported_type(f.data_type())) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::datasource::file_format::parquet::ParquetFormat; + use crate::datasource::file_format::FileFormat; + use crate::datasource::object_store::local::{ + local_object_reader, local_object_reader_stream, local_unpartitioned_file, + LocalFileSystem, + }; + use crate::error::Result; + use crate::execution::runtime_env::RuntimeEnv; + use crate::physical_plan::file_format::FileScanConfig; + use crate::physical_plan::{collect, ExecutionPlan}; + use crate::row::reader::read_as_batch; + use crate::row::writer::write_batch_unchecked; + use arrow::record_batch::RecordBatch; + use arrow::{array::*, datatypes::*}; + use DataType::*; + + macro_rules! fn_test_single_type { + ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { + paste::item! { + #[test] + #[allow(non_snake_case)] + fn [<test_single_ $TYPE>]() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); + let a = $ARRAY::from($VEC); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 1024]; + let row_offsets = + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + } + }; + } + + fn_test_single_type!( + BooleanArray, + Boolean, + vec![Some(true), Some(false), None, Some(true), None] + ); + + fn_test_single_type!( + Int8Array, + Int8, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int16Array, + Int16, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int32Array, + Int32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int64Array, + Int64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt8Array, + UInt8, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt16Array, + UInt16, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt32Array, + UInt32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt64Array, + UInt64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Float32Array, + Float32, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + ); + + fn_test_single_type!( + Float64Array, + Float64, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + ); + + fn_test_single_type!( + Date32Array, + Date32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Date64Array, + Date64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + StringArray, + Utf8, + vec![Some("hello"), Some("world"), None, Some(""), Some("")] + ); + + #[test] + fn test_single_binary() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); + let values: Vec<Option<&[u8]>> = + vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")]; + let a = BinaryArray::from_opt_vec(values); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 8192]; + let row_offsets = + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + + #[tokio::test] + async fn test_with_parquet() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let schema = exec.schema().clone(); + + let batches = collect(exec, runtime).await?; + assert_eq!(1, batches.len()); + let batch = &batches[0]; + + let mut vector = vec![0; 20480]; + let row_offsets = + { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? }; + assert_eq!(*batch, output_batch); + + Ok(()) + } + + #[test] + #[should_panic(expected = "supported(schema)")] + fn test_unsupported_type_write() { + let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); Review comment: FWIW it would be really cool to support this as IOx uses it and it is just an `Int64Array` with a different logical type, but I can always add later :grin: ########## File path: datafusion/src/row/bitmap/mod.rs ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! General utilities for null bit section handling +//! +//! Note: this is a tailored version based on [arrow2 bitmap utils](https://github.com/jorgecarleitao/arrow2/tree/main/src/bitmap/utils) + +mod fmt; + +pub use fmt::fmt; + +const BIT_MASK: [u8; 8] = [1, 2, 4, 8, 16, 32, 64, 128]; +const UNSET_BIT_MASK: [u8; 8] = [ + 255 - 1, + 255 - 2, + 255 - 4, + 255 - 8, + 255 - 16, + 255 - 32, + 255 - 64, + 255 - 128, +]; +const ALL_VALID_MASK: [u8; 8] = [1, 3, 7, 15, 31, 63, 127, 255]; + +/// Returns whether bit at position `i` in `byte` is set or not +#[inline] +pub fn is_set(byte: u8, i: usize) -> bool { + (byte & BIT_MASK[i]) != 0 +} + +/// Sets bit at position `i` in `byte` +#[inline] +pub fn set(byte: u8, i: usize, value: bool) -> u8 { + if value { + byte | BIT_MASK[i] + } else { + byte & UNSET_BIT_MASK[i] + } +} + +/// Sets bit at position `i` in `data` +#[inline] +pub fn set_bit(data: &mut [u8], i: usize, value: bool) { + data[i / 8] = set(data[i / 8], i % 8, value); +} + +/// Returns whether bit at position `i` in `data` is set or not. +/// +/// # Safety +/// `i >= data.len() * 8` results in undefined behavior +#[inline] +pub unsafe fn get_bit_unchecked(data: &[u8], i: usize) -> bool { + (*data.as_ptr().add(i >> 3) & BIT_MASK[i & 7]) != 0 +} + +/// Returns the number of bytes required to hold `bits` bits. +#[inline] +pub fn bytes_for(bits: usize) -> usize { + bits.saturating_add(7) / 8 Review comment: I was curious if this optimized to the same thing as `ceil(value, 8)` and the answer is not quite ``` add rax, 7 mov rcx, -1 cmovae rcx, rax shr rcx, 3 ``` vs ``` mov rcx, rax shr rcx, 3 and eax, 7 cmp rax, 1 sbb rcx, -1 ``` The reason being that technically the saturating add form is incorrect at the limit. I sincerely doubt there is any actual performance difference, but I was sufficiently nerd sniped :laughing: ########## File path: datafusion/src/row/reader.rs ########## @@ -0,0 +1,441 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Accessing row from raw bytes Review comment: I think a pretty picture would be very helpful, showing how data is encoded ########## File path: datafusion/src/row/writer.rs ########## @@ -0,0 +1,322 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reusable row writer backed by Vec<u8> to stitch attributes together + +use crate::row::bitmap::{bytes_for, set_bit}; +use crate::row::{estimate_row_width, fixed_size, get_offsets, supported}; +use arrow::array::Array; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use std::cmp::max; +use std::sync::Arc; + +/// Append batch from `row_idx` to `output` buffer start from `offset` +/// # Panics +/// +/// This function will panic if the output buffer doesn't have enough space to hold all the rows +pub fn write_batch_unchecked( + output: &mut [u8], + offset: usize, + batch: &RecordBatch, + row_idx: usize, + schema: Arc<Schema>, +) -> Vec<usize> { + let mut writer = RowWriter::new(&schema); + let mut current_offset = offset; + let mut offsets = vec![]; + for cur_row in row_idx..batch.num_rows() { + offsets.push(current_offset); + let row_width = write_row(&mut writer, cur_row, batch, &schema); + output[current_offset..current_offset + row_width] + .copy_from_slice(writer.get_row()); + current_offset += row_width; + writer.reset() + } + offsets +} + +macro_rules! set_idx { + ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ + $SELF.assert_index_valid($IDX); + let offset = $SELF.field_offsets[$IDX]; + $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes()); + }}; +} + +macro_rules! fn_set_idx { + ($NATIVE: ident, $WIDTH: literal) => { + paste::item! { + fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes()); + } + } + }; +} + +/// Reusable row writer backed by Vec<u8> +pub struct RowWriter { + data: Vec<u8>, + field_count: usize, + row_width: usize, + null_width: usize, + values_width: usize, + varlena_width: usize, + varlena_offset: usize, + field_offsets: Vec<usize>, +} + +impl RowWriter { + /// new + pub fn new(schema: &Arc<Schema>) -> Self { + assert!(supported(schema)); + let field_count = schema.fields().len(); + let null_width = bytes_for(field_count); + let (field_offsets, values_width) = get_offsets(null_width, schema); + let mut init_capacity = estimate_row_width(null_width, schema); + if !fixed_size(schema) { + // double the capacity to avoid repeated resize + init_capacity *= 2; + } + Self { + data: vec![0; init_capacity], + field_count, + row_width: 0, + null_width, + values_width, + varlena_width: 0, + varlena_offset: null_width + values_width, Review comment: This way of representing variable length data is pretty cool :+1: ########## File path: datafusion/src/row/mod.rs ########## @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An implementation of Row backed by raw bytes +//! +//! Each tuple consists of up to three parts: [null bit set] [values] [var length data] +//! +//! The null bit set is used for null tracking and is aligned to 1-byte. It stores +//! one bit per field. +//! +//! In the region of the values, we store the fields in the order they are defined in the schema. +//! - For fixed-length, sequential access fields, we store them directly. +//! E.g., 4 bytes for int and 1 byte for bool. +//! - For fixed-length, update often fields, we store one 8-byte word per field. +//! - For fields of non-primitive or variable-length types, +//! we append their actual content to the end of the var length region and +//! store their offset relative to row base and their length, packed into an 8-byte word. + +use arrow::datatypes::{DataType, Schema}; +use std::sync::Arc; + +mod bitmap; +mod reader; +mod writer; + +const UTF8_DEFAULT_SIZE: usize = 20; +const BINARY_DEFAULT_SIZE: usize = 100; + +/// Get relative offsets for each field and total width for values +fn get_offsets(null_width: usize, schema: &Arc<Schema>) -> (Vec<usize>, usize) { + let mut offsets = vec![]; + let mut offset = null_width; + for f in schema.fields() { + offsets.push(offset); + offset += type_width(f.data_type()); + } + (offsets, offset - null_width) +} + +fn supported_type(dt: &DataType) -> bool { + use DataType::*; + matches!( + dt, + Boolean + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Int8 + | Int16 + | Int32 + | Int64 + | Float32 + | Float64 + | Date32 + | Date64 + | Utf8 + | Binary + ) +} + +fn var_length(dt: &DataType) -> bool { + use DataType::*; + matches!(dt, Utf8 | Binary) +} + +fn type_width(dt: &DataType) -> usize { + use DataType::*; + if var_length(dt) { + return 8; Review comment: I think this should probably be `std::mem::size_of<u64>` or ideally a `varlena` offset type alias ########## File path: datafusion/src/row/reader.rs ########## @@ -0,0 +1,441 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Accessing row from raw bytes + +use crate::error::{DataFusionError, Result}; +use crate::row::bitmap::{all_valid, bytes_for, get_bit_unchecked}; +use crate::row::{get_offsets, supported}; +use arrow::array::{make_builder, ArrayBuilder}; +use arrow::datatypes::{DataType, Schema}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch +pub fn read_as_batch( + data: &mut [u8], + schema: Arc<Schema>, + offsets: Vec<usize>, +) -> Result<RecordBatch> { + let row_num = offsets.len(); + let mut output = MutableRecordBatch::new(row_num, schema.clone()); + let mut row = RowReader::new(&schema, data); + + for offset in offsets.iter().take(row_num) { + row.point_to(*offset); + read_row(&row, &mut output, &schema)? + } + + output.output().map_err(DataFusionError::ArrowError) +} + +macro_rules! get_idx { + ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{ + $SELF.assert_index_valid($IDX); + let offset = $SELF.field_offsets[$IDX]; + let start = $SELF.base_offset + offset; + let end = start + $WIDTH; + $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap()) + }}; +} + +macro_rules! fn_get_idx { + ($NATIVE: ident, $WIDTH: literal) => { + paste::item! { + fn [<get_ $NATIVE>](&self, idx: usize) -> $NATIVE { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + let start = self.base_offset + offset; + let end = start + $WIDTH; + $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap()) + } + } + }; +} + +macro_rules! fn_get_idx_opt { + ($NATIVE: ident) => { + paste::item! { + fn [<get_ $NATIVE _opt>](&self, idx: usize) -> Option<$NATIVE> { + if self.is_valid_at(idx) { + Some(self.[<get_ $NATIVE>](idx)) + } else { + None + } + } + } + }; +} + +struct RowReader<'a> { + data: &'a [u8], + base_offset: usize, + field_count: usize, + null_width: usize, + field_offsets: Vec<usize>, +} + +impl<'a> std::fmt::Debug for RowReader<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let null_bits = self.null_bits(); + super::bitmap::fmt(null_bits, 0, self.null_width, f) + } +} + +impl<'a> RowReader<'a> { + fn new(schema: &Arc<Schema>, data: &'a [u8]) -> Self { + assert!(supported(schema)); + let field_count = schema.fields().len(); + let null_width = bytes_for(field_count); + let (field_offsets, _) = get_offsets(null_width, schema); + Self { + data, + base_offset: 0, + field_count, + null_width, + field_offsets, + } + } + + /// Update this row to point to position `offset` in `base` + fn point_to(&mut self, offset: usize) { + self.base_offset = offset; + } + + #[inline] + fn assert_index_valid(&self, idx: usize) { + assert!(idx < self.field_count); + } + + #[inline(always)] + fn null_bits(&self) -> &[u8] { + let start = self.base_offset; + &self.data[start..start + self.null_width] + } + + #[inline(always)] + fn all_valid(&self) -> bool { + let null_bits = self.null_bits(); + all_valid(null_bits, self.field_count) + } + + fn is_valid_at(&self, idx: usize) -> bool { + unsafe { get_bit_unchecked(self.null_bits(), idx) } + } + + fn get_bool(&self, idx: usize) -> bool { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + let value = &self.data[self.base_offset + offset..]; + value[0] != 0 + } + + fn get_u8(&self, idx: usize) -> u8 { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[self.base_offset + offset] + } + + fn_get_idx!(u16, 2); + fn_get_idx!(u32, 4); + fn_get_idx!(u64, 8); + fn_get_idx!(i8, 1); + fn_get_idx!(i16, 2); + fn_get_idx!(i32, 4); + fn_get_idx!(i64, 8); + fn_get_idx!(f32, 4); + fn_get_idx!(f64, 8); + + fn get_date32(&self, idx: usize) -> i32 { + get_idx!(i32, self, idx, 4) + } + + fn get_date64(&self, idx: usize) -> i64 { + get_idx!(i64, self, idx, 8) + } + + fn get_utf8(&self, idx: usize) -> &str { + self.assert_index_valid(idx); + let offset_size = self.get_u64(idx); + let offset = (offset_size >> 32) as usize; + let len = (offset_size & 0xffff_ffff) as usize; + let varlena_offset = self.base_offset + offset; + let bytes = &self.data[varlena_offset..varlena_offset + len]; + std::str::from_utf8(bytes).unwrap() + } + + fn get_binary(&self, idx: usize) -> &[u8] { + self.assert_index_valid(idx); + let offset_size = self.get_u64(idx); + let offset = (offset_size >> 32) as usize; + let len = (offset_size & 0xffff_ffff) as usize; + let varlena_offset = self.base_offset + offset; + &self.data[varlena_offset..varlena_offset + len] + } + + fn_get_idx_opt!(bool); + fn_get_idx_opt!(u8); + fn_get_idx_opt!(u16); + fn_get_idx_opt!(u32); + fn_get_idx_opt!(u64); + fn_get_idx_opt!(i8); + fn_get_idx_opt!(i16); + fn_get_idx_opt!(i32); + fn_get_idx_opt!(i64); + fn_get_idx_opt!(f32); + fn_get_idx_opt!(f64); + + fn get_date32_opt(&self, idx: usize) -> Option<i32> { + if self.is_valid_at(idx) { + Some(self.get_date32(idx)) + } else { + None + } + } + + fn get_date64_opt(&self, idx: usize) -> Option<i64> { + if self.is_valid_at(idx) { + Some(self.get_date64(idx)) + } else { + None + } + } + + fn get_utf8_opt(&self, idx: usize) -> Option<&str> { + if self.is_valid_at(idx) { + Some(self.get_utf8(idx)) + } else { + None + } + } +} + +fn read_row( + row: &RowReader, + batch: &mut MutableRecordBatch, + schema: &Arc<Schema>, +) -> Result<()> { + if row.all_valid() { + for ((col_idx, to), field) in batch + .arrays + .iter_mut() + .enumerate() + .zip(schema.fields().iter()) + { + read_field_null_free(to, field.data_type(), col_idx, row)? + } + } else { + for ((col_idx, to), field) in batch + .arrays + .iter_mut() + .enumerate() + .zip(schema.fields().iter()) + { + read_field(to, field.data_type(), col_idx, row)? + } + } + Ok(()) +} + +fn read_field( + to: &mut Box<dyn ArrayBuilder>, + dt: &DataType, + col_idx: usize, + row: &RowReader, +) -> Result<()> { + use arrow::array::*; + use DataType::*; + match dt { + Boolean => { + let to = to.as_any_mut().downcast_mut::<BooleanBuilder>().unwrap(); + to.append_option(row.get_bool_opt(col_idx))?; + } + UInt8 => { + let to = to.as_any_mut().downcast_mut::<UInt8Builder>().unwrap(); + to.append_option(row.get_u8_opt(col_idx))?; + } + UInt16 => { + let to = to.as_any_mut().downcast_mut::<UInt16Builder>().unwrap(); + to.append_option(row.get_u16_opt(col_idx))?; + } + UInt32 => { + let to = to.as_any_mut().downcast_mut::<UInt32Builder>().unwrap(); + to.append_option(row.get_u32_opt(col_idx))?; + } + UInt64 => { + let to = to.as_any_mut().downcast_mut::<UInt64Builder>().unwrap(); + to.append_option(row.get_u64_opt(col_idx))?; + } + Int8 => { + let to = to.as_any_mut().downcast_mut::<Int8Builder>().unwrap(); + to.append_option(row.get_i8_opt(col_idx))?; + } + Int16 => { + let to = to.as_any_mut().downcast_mut::<Int16Builder>().unwrap(); + to.append_option(row.get_i16_opt(col_idx))?; + } + Int32 => { + let to = to.as_any_mut().downcast_mut::<Int32Builder>().unwrap(); + to.append_option(row.get_i32_opt(col_idx))?; + } + Int64 => { + let to = to.as_any_mut().downcast_mut::<Int64Builder>().unwrap(); + to.append_option(row.get_i64_opt(col_idx))?; + } + Float32 => { + let to = to.as_any_mut().downcast_mut::<Float32Builder>().unwrap(); + to.append_option(row.get_f32_opt(col_idx))?; + } + Float64 => { + let to = to.as_any_mut().downcast_mut::<Float64Builder>().unwrap(); + to.append_option(row.get_f64_opt(col_idx))?; + } + Date32 => { + let to = to.as_any_mut().downcast_mut::<Date32Builder>().unwrap(); + to.append_option(row.get_date32_opt(col_idx))?; + } + Date64 => { + let to = to.as_any_mut().downcast_mut::<Date64Builder>().unwrap(); + to.append_option(row.get_date64_opt(col_idx))?; + } + Utf8 => { + let to = to.as_any_mut().downcast_mut::<StringBuilder>().unwrap(); + to.append_option(row.get_utf8_opt(col_idx))?; + } + Binary => { + let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap(); + if row.is_valid_at(col_idx) { + to.append_value(row.get_binary(col_idx))?; + } else { + to.append_null()?; + } + } + _ => unimplemented!(), + } + Ok(()) +} + +fn read_field_null_free( + to: &mut Box<dyn ArrayBuilder>, + dt: &DataType, + col_idx: usize, + row: &RowReader, +) -> Result<()> { + use arrow::array::*; + use DataType::*; + match dt { + Boolean => { + let to = to.as_any_mut().downcast_mut::<BooleanBuilder>().unwrap(); + to.append_value(row.get_bool(col_idx))?; + } + UInt8 => { + let to = to.as_any_mut().downcast_mut::<UInt8Builder>().unwrap(); + to.append_value(row.get_u8(col_idx))?; + } + UInt16 => { + let to = to.as_any_mut().downcast_mut::<UInt16Builder>().unwrap(); + to.append_value(row.get_u16(col_idx))?; + } + UInt32 => { + let to = to.as_any_mut().downcast_mut::<UInt32Builder>().unwrap(); + to.append_value(row.get_u32(col_idx))?; + } + UInt64 => { + let to = to.as_any_mut().downcast_mut::<UInt64Builder>().unwrap(); + to.append_value(row.get_u64(col_idx))?; + } + Int8 => { + let to = to.as_any_mut().downcast_mut::<Int8Builder>().unwrap(); + to.append_value(row.get_i8(col_idx))?; + } + Int16 => { + let to = to.as_any_mut().downcast_mut::<Int16Builder>().unwrap(); + to.append_value(row.get_i16(col_idx))?; + } + Int32 => { + let to = to.as_any_mut().downcast_mut::<Int32Builder>().unwrap(); + to.append_value(row.get_i32(col_idx))?; + } + Int64 => { + let to = to.as_any_mut().downcast_mut::<Int64Builder>().unwrap(); + to.append_value(row.get_i64(col_idx))?; + } + Float32 => { + let to = to.as_any_mut().downcast_mut::<Float32Builder>().unwrap(); + to.append_value(row.get_f32(col_idx))?; + } + Float64 => { + let to = to.as_any_mut().downcast_mut::<Float64Builder>().unwrap(); + to.append_value(row.get_f64(col_idx))?; + } + Date32 => { + let to = to.as_any_mut().downcast_mut::<Date32Builder>().unwrap(); + to.append_value(row.get_date32(col_idx))?; + } + Date64 => { + let to = to.as_any_mut().downcast_mut::<Date64Builder>().unwrap(); + to.append_value(row.get_date64(col_idx))?; + } + Utf8 => { + let to = to.as_any_mut().downcast_mut::<StringBuilder>().unwrap(); + to.append_value(row.get_utf8(col_idx))?; + } + Binary => { + let to = to.as_any_mut().downcast_mut::<BinaryBuilder>().unwrap(); + to.append_value(row.get_binary(col_idx))?; + } + _ => unimplemented!(), + } + Ok(()) +} + +struct MutableRecordBatch { Review comment: This feels like something we could upstream into arrow-rs :thinking: ########## File path: datafusion/src/row/writer.rs ########## @@ -0,0 +1,322 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reusable row writer backed by Vec<u8> to stitch attributes together + +use crate::row::bitmap::{bytes_for, set_bit}; +use crate::row::{estimate_row_width, fixed_size, get_offsets, supported}; +use arrow::array::Array; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use std::cmp::max; +use std::sync::Arc; + +/// Append batch from `row_idx` to `output` buffer start from `offset` +/// # Panics +/// +/// This function will panic if the output buffer doesn't have enough space to hold all the rows +pub fn write_batch_unchecked( + output: &mut [u8], + offset: usize, + batch: &RecordBatch, + row_idx: usize, + schema: Arc<Schema>, +) -> Vec<usize> { + let mut writer = RowWriter::new(&schema); + let mut current_offset = offset; + let mut offsets = vec![]; + for cur_row in row_idx..batch.num_rows() { + offsets.push(current_offset); + let row_width = write_row(&mut writer, cur_row, batch, &schema); + output[current_offset..current_offset + row_width] + .copy_from_slice(writer.get_row()); + current_offset += row_width; + writer.reset() + } + offsets +} + +macro_rules! set_idx { + ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ + $SELF.assert_index_valid($IDX); + let offset = $SELF.field_offsets[$IDX]; + $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes()); + }}; +} + +macro_rules! fn_set_idx { + ($NATIVE: ident, $WIDTH: literal) => { + paste::item! { + fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes()); + } + } + }; +} + +/// Reusable row writer backed by Vec<u8> +pub struct RowWriter { + data: Vec<u8>, + field_count: usize, + row_width: usize, + null_width: usize, + values_width: usize, + varlena_width: usize, + varlena_offset: usize, + field_offsets: Vec<usize>, Review comment: Some doc strings would be pretty :+1: ########## File path: datafusion/src/row/bitmap/fmt.rs ########## @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Write; + +use super::is_set; + +/// Formats `bytes` taking into account an offset and length of the form +pub fn fmt( + bytes: &[u8], + offset: usize, + length: usize, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { Review comment: It might be an idea to make this a `struct BitFormatter<'a>` that implements `Display`. Similar to the `A` in the test ########## File path: datafusion/src/row/bitmap/fmt.rs ########## @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Write; + +use super::is_set; + +/// Formats `bytes` taking into account an offset and length of the form Review comment: Possibly worth specifying that `offset` and `length` are in bits ########## File path: datafusion/src/row/bitmap/fmt.rs ########## @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Write; + +use super::is_set; + +/// Formats `bytes` taking into account an offset and length of the form +pub fn fmt( + bytes: &[u8], + offset: usize, + length: usize, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + assert!(offset < 8); Review comment: You could compute `offset / 8` and offset `bytes` by this amount (and decrement length) instead of panicking ########## File path: datafusion/src/row/bitmap/fmt.rs ########## @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Write; + +use super::is_set; + +/// Formats `bytes` taking into account an offset and length of the form +pub fn fmt( + bytes: &[u8], + offset: usize, + length: usize, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + assert!(offset < 8); + + f.write_char('[')?; + let mut remaining = length; + if remaining == 0 { + f.write_char(']')?; + return Ok(()); + } + + let first = bytes[0]; + let bytes = &bytes[1..]; + let empty_before = 8usize.saturating_sub(remaining + offset); + f.write_str("0b")?; + for _ in 0..empty_before { + f.write_char('_')?; + } + let until = std::cmp::min(8, offset + remaining); + for i in offset..until { + if is_set(first, offset + until - 1 - i) { + f.write_char('1')?; + } else { + f.write_char('0')?; + } + } + for _ in 0..offset { + f.write_char('_')?; + } + remaining -= until - offset; + + if remaining == 0 { + f.write_char(']')?; + return Ok(()); + } + + let number_of_bytes = remaining / 8; + for byte in &bytes[..number_of_bytes] { + f.write_str(", ")?; + f.write_fmt(format_args!("{:#010b}", byte))?; + } + remaining -= number_of_bytes * 8; + if remaining == 0 { + f.write_char(']')?; + return Ok(()); + } + + let last = bytes[std::cmp::min((length + offset + 7) / 8, bytes.len() - 1)]; + let remaining = (length + offset) % 8; + f.write_str(", ")?; + f.write_str("0b")?; + for _ in 0..(8 - remaining) { + f.write_char('_')?; + } + for i in 0..remaining { + if is_set(last, remaining - 1 - i) { + f.write_char('1')?; + } else { + f.write_char('0')?; + } + } + f.write_char(']') +} + +#[cfg(test)] +mod tests { + use super::*; + + struct A<'a>(&'a [u8], usize, usize); + impl<'a> std::fmt::Debug for A<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fmt(self.0, self.1, self.2, f) + } + } + + #[test] + fn test_debug() -> std::fmt::Result { + assert_eq!(format!("{:?}", A(&[1], 0, 0)), "[]"); Review comment: Putting some of this into a doctest might make it easier to see what the function does ########## File path: datafusion/src/row/reader.rs ########## @@ -0,0 +1,441 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Accessing row from raw bytes + +use crate::error::{DataFusionError, Result}; +use crate::row::bitmap::{all_valid, bytes_for, get_bit_unchecked}; +use crate::row::{get_offsets, supported}; +use arrow::array::{make_builder, ArrayBuilder}; +use arrow::datatypes::{DataType, Schema}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use std::sync::Arc; + +/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch +pub fn read_as_batch( + data: &mut [u8], + schema: Arc<Schema>, + offsets: Vec<usize>, +) -> Result<RecordBatch> { + let row_num = offsets.len(); + let mut output = MutableRecordBatch::new(row_num, schema.clone()); + let mut row = RowReader::new(&schema, data); + + for offset in offsets.iter().take(row_num) { + row.point_to(*offset); + read_row(&row, &mut output, &schema)? + } + + output.output().map_err(DataFusionError::ArrowError) +} + +macro_rules! get_idx { + ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{ + $SELF.assert_index_valid($IDX); + let offset = $SELF.field_offsets[$IDX]; + let start = $SELF.base_offset + offset; + let end = start + $WIDTH; + $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap()) + }}; +} + +macro_rules! fn_get_idx { + ($NATIVE: ident, $WIDTH: literal) => { + paste::item! { + fn [<get_ $NATIVE>](&self, idx: usize) -> $NATIVE { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + let start = self.base_offset + offset; + let end = start + $WIDTH; + $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap()) + } + } + }; +} + +macro_rules! fn_get_idx_opt { + ($NATIVE: ident) => { + paste::item! { + fn [<get_ $NATIVE _opt>](&self, idx: usize) -> Option<$NATIVE> { + if self.is_valid_at(idx) { + Some(self.[<get_ $NATIVE>](idx)) + } else { + None + } + } + } + }; +} + +struct RowReader<'a> { + data: &'a [u8], + base_offset: usize, + field_count: usize, + null_width: usize, + field_offsets: Vec<usize>, +} + +impl<'a> std::fmt::Debug for RowReader<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let null_bits = self.null_bits(); + super::bitmap::fmt(null_bits, 0, self.null_width, f) + } +} + +impl<'a> RowReader<'a> { + fn new(schema: &Arc<Schema>, data: &'a [u8]) -> Self { + assert!(supported(schema)); + let field_count = schema.fields().len(); + let null_width = bytes_for(field_count); + let (field_offsets, _) = get_offsets(null_width, schema); + Self { + data, + base_offset: 0, + field_count, + null_width, + field_offsets, + } + } + + /// Update this row to point to position `offset` in `base` + fn point_to(&mut self, offset: usize) { + self.base_offset = offset; + } + + #[inline] + fn assert_index_valid(&self, idx: usize) { + assert!(idx < self.field_count); + } + + #[inline(always)] + fn null_bits(&self) -> &[u8] { + let start = self.base_offset; + &self.data[start..start + self.null_width] + } + + #[inline(always)] + fn all_valid(&self) -> bool { + let null_bits = self.null_bits(); + all_valid(null_bits, self.field_count) + } + + fn is_valid_at(&self, idx: usize) -> bool { + unsafe { get_bit_unchecked(self.null_bits(), idx) } + } + + fn get_bool(&self, idx: usize) -> bool { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + let value = &self.data[self.base_offset + offset..]; + value[0] != 0 + } + + fn get_u8(&self, idx: usize) -> u8 { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[self.base_offset + offset] + } + + fn_get_idx!(u16, 2); + fn_get_idx!(u32, 4); + fn_get_idx!(u64, 8); + fn_get_idx!(i8, 1); + fn_get_idx!(i16, 2); + fn_get_idx!(i32, 4); + fn_get_idx!(i64, 8); + fn_get_idx!(f32, 4); + fn_get_idx!(f64, 8); + + fn get_date32(&self, idx: usize) -> i32 { + get_idx!(i32, self, idx, 4) + } + + fn get_date64(&self, idx: usize) -> i64 { Review comment: Perhaps these methods should assert that `idx` is actually the corresponding type. Otherwise this is effectively reinterpreting memory as different primitives, which whilst technically safe, is borderline unsafe :grinning: ########## File path: datafusion/src/row/writer.rs ########## @@ -0,0 +1,322 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reusable row writer backed by Vec<u8> to stitch attributes together + +use crate::row::bitmap::{bytes_for, set_bit}; +use crate::row::{estimate_row_width, fixed_size, get_offsets, supported}; +use arrow::array::Array; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use std::cmp::max; +use std::sync::Arc; + +/// Append batch from `row_idx` to `output` buffer start from `offset` +/// # Panics +/// +/// This function will panic if the output buffer doesn't have enough space to hold all the rows +pub fn write_batch_unchecked( + output: &mut [u8], + offset: usize, + batch: &RecordBatch, + row_idx: usize, + schema: Arc<Schema>, +) -> Vec<usize> { + let mut writer = RowWriter::new(&schema); + let mut current_offset = offset; + let mut offsets = vec![]; + for cur_row in row_idx..batch.num_rows() { + offsets.push(current_offset); + let row_width = write_row(&mut writer, cur_row, batch, &schema); + output[current_offset..current_offset + row_width] + .copy_from_slice(writer.get_row()); + current_offset += row_width; + writer.reset() + } + offsets +} + +macro_rules! set_idx { + ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ + $SELF.assert_index_valid($IDX); + let offset = $SELF.field_offsets[$IDX]; + $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes()); + }}; +} + +macro_rules! fn_set_idx { + ($NATIVE: ident, $WIDTH: literal) => { + paste::item! { + fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes()); + } + } + }; +} + +/// Reusable row writer backed by Vec<u8> +pub struct RowWriter { + data: Vec<u8>, + field_count: usize, + row_width: usize, + null_width: usize, + values_width: usize, + varlena_width: usize, + varlena_offset: usize, + field_offsets: Vec<usize>, +} + +impl RowWriter { + /// new + pub fn new(schema: &Arc<Schema>) -> Self { + assert!(supported(schema)); + let field_count = schema.fields().len(); + let null_width = bytes_for(field_count); + let (field_offsets, values_width) = get_offsets(null_width, schema); + let mut init_capacity = estimate_row_width(null_width, schema); + if !fixed_size(schema) { + // double the capacity to avoid repeated resize + init_capacity *= 2; + } + Self { + data: vec![0; init_capacity], + field_count, + row_width: 0, + null_width, + values_width, + varlena_width: 0, + varlena_offset: null_width + values_width, + field_offsets, + } + } + + /// Reset the row writer state for new tuple + pub fn reset(&mut self) { + self.data.fill(0); + self.row_width = 0; + self.varlena_width = 0; + self.varlena_offset = self.null_width + self.values_width; + } + + #[inline] + fn assert_index_valid(&self, idx: usize) { + assert!(idx < self.field_count); + } + + fn set_null_at(&mut self, idx: usize) { + let null_bits = &mut self.data[0..self.null_width]; + set_bit(null_bits, idx, false) + } + + fn set_non_null_at(&mut self, idx: usize) { + let null_bits = &mut self.data[0..self.null_width]; + set_bit(null_bits, idx, true) + } + + fn set_bool(&mut self, idx: usize, value: bool) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = if value { 1 } else { 0 }; + } + + fn set_u8(&mut self, idx: usize, value: u8) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = value; + } + + fn_set_idx!(u16, 2); + fn_set_idx!(u32, 4); + fn_set_idx!(u64, 8); + fn_set_idx!(i16, 2); + fn_set_idx!(i32, 4); + fn_set_idx!(i64, 8); + fn_set_idx!(f32, 4); + fn_set_idx!(f64, 8); + + fn set_i8(&mut self, idx: usize, value: i8) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = value.to_le_bytes()[0]; + } + + fn set_date32(&mut self, idx: usize, value: i32) { + set_idx!(4, self, idx, value) + } + + fn set_date64(&mut self, idx: usize, value: i64) { + set_idx!(8, self, idx, value) + } + + fn set_offset_size(&mut self, idx: usize, size: usize) { + let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64; + self.set_u64(idx, offset_and_size); + } + + fn set_utf8(&mut self, idx: usize, value: &str) { + self.assert_index_valid(idx); + let bytes = value.as_bytes(); + let size = bytes.len(); + self.set_offset_size(idx, size); + let varlena_offset = self.varlena_offset; + self.data[varlena_offset..varlena_offset + size].copy_from_slice(bytes); + self.varlena_offset += size; + self.varlena_width += size; + } + + fn set_binary(&mut self, idx: usize, value: &[u8]) { + self.assert_index_valid(idx); + let size = value.len(); + self.set_offset_size(idx, size); + let varlena_offset = self.varlena_offset; + self.data[varlena_offset..varlena_offset + size].copy_from_slice(value); + self.varlena_offset += size; + self.varlena_width += size; + } + + fn current_width(&self) -> usize { + self.null_width + self.values_width + self.varlena_width + } + + /// End each row at 8-byte word boundary. + fn end_padding(&mut self) { + let payload_width = self.current_width(); + self.row_width = (payload_width.saturating_add(7) / 8) * 8; + if self.data.capacity() < self.row_width { + self.data.resize(self.row_width, 0); + } + } + + fn get_row(&self) -> &[u8] { + &self.data[0..self.row_width] + } +} + +/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width +fn write_row( + row: &mut RowWriter, + row_idx: usize, + batch: &RecordBatch, + schema: &Arc<Schema>, +) -> usize { + // Get the row from the batch denoted by row_idx + for ((i, f), col) in schema + .fields() + .iter() + .enumerate() + .zip(batch.columns().iter()) + { + if !col.is_null(row_idx) { + row.set_non_null_at(i); + write_field(i, row_idx, col, f.data_type(), row); + } else { + row.set_null_at(i); + } + } + + row.end_padding(); + row.row_width +} + +fn write_field( + col_idx: usize, + row_idx: usize, + col: &Arc<dyn Array>, + dt: &DataType, + row: &mut RowWriter, +) { + // TODO: JIT compile this + use arrow::array::*; + use DataType::*; + match dt { + Boolean => { + let c = col.as_any().downcast_ref::<BooleanArray>().unwrap(); + row.set_bool(col_idx, c.value(row_idx)); + } + UInt8 => { + let c = col.as_any().downcast_ref::<UInt8Array>().unwrap(); + row.set_u8(col_idx, c.value(row_idx)); + } + UInt16 => { + let c = col.as_any().downcast_ref::<UInt16Array>().unwrap(); + row.set_u16(col_idx, c.value(row_idx)); + } + UInt32 => { + let c = col.as_any().downcast_ref::<UInt32Array>().unwrap(); + row.set_u32(col_idx, c.value(row_idx)); + } + UInt64 => { + let c = col.as_any().downcast_ref::<UInt64Array>().unwrap(); + row.set_u64(col_idx, c.value(row_idx)); + } + Int8 => { + let c = col.as_any().downcast_ref::<Int8Array>().unwrap(); + row.set_i8(col_idx, c.value(row_idx)); + } + Int16 => { + let c = col.as_any().downcast_ref::<Int16Array>().unwrap(); + row.set_i16(col_idx, c.value(row_idx)); + } + Int32 => { + let c = col.as_any().downcast_ref::<Int32Array>().unwrap(); + row.set_i32(col_idx, c.value(row_idx)); + } + Int64 => { + let c = col.as_any().downcast_ref::<Int64Array>().unwrap(); + row.set_i64(col_idx, c.value(row_idx)); + } + Float32 => { + let c = col.as_any().downcast_ref::<Float32Array>().unwrap(); + row.set_f32(col_idx, c.value(row_idx)); + } + Float64 => { + let c = col.as_any().downcast_ref::<Float64Array>().unwrap(); + row.set_f64(col_idx, c.value(row_idx)); + } + Date32 => { + let c = col.as_any().downcast_ref::<Date32Array>().unwrap(); + row.set_date32(col_idx, c.value(row_idx)); + } + Date64 => { + let c = col.as_any().downcast_ref::<Date64Array>().unwrap(); + row.set_date64(col_idx, c.value(row_idx)); + } + Utf8 => { + let c = col.as_any().downcast_ref::<StringArray>().unwrap(); + let str = c.value(row_idx); Review comment: ```suggestion let s = c.value(row_idx); ``` ########## File path: datafusion/src/row/writer.rs ########## @@ -0,0 +1,322 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reusable row writer backed by Vec<u8> to stitch attributes together + +use crate::row::bitmap::{bytes_for, set_bit}; +use crate::row::{estimate_row_width, fixed_size, get_offsets, supported}; +use arrow::array::Array; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use std::cmp::max; +use std::sync::Arc; + +/// Append batch from `row_idx` to `output` buffer start from `offset` +/// # Panics +/// +/// This function will panic if the output buffer doesn't have enough space to hold all the rows +pub fn write_batch_unchecked( + output: &mut [u8], + offset: usize, + batch: &RecordBatch, + row_idx: usize, + schema: Arc<Schema>, +) -> Vec<usize> { + let mut writer = RowWriter::new(&schema); + let mut current_offset = offset; + let mut offsets = vec![]; + for cur_row in row_idx..batch.num_rows() { + offsets.push(current_offset); + let row_width = write_row(&mut writer, cur_row, batch, &schema); + output[current_offset..current_offset + row_width] + .copy_from_slice(writer.get_row()); + current_offset += row_width; + writer.reset() + } + offsets +} + +macro_rules! set_idx { + ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ + $SELF.assert_index_valid($IDX); + let offset = $SELF.field_offsets[$IDX]; + $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes()); + }}; +} + +macro_rules! fn_set_idx { + ($NATIVE: ident, $WIDTH: literal) => { + paste::item! { + fn [<set_ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes()); + } + } + }; +} + +/// Reusable row writer backed by Vec<u8> +pub struct RowWriter { + data: Vec<u8>, + field_count: usize, + row_width: usize, + null_width: usize, + values_width: usize, + varlena_width: usize, + varlena_offset: usize, + field_offsets: Vec<usize>, +} + +impl RowWriter { + /// new + pub fn new(schema: &Arc<Schema>) -> Self { + assert!(supported(schema)); + let field_count = schema.fields().len(); + let null_width = bytes_for(field_count); + let (field_offsets, values_width) = get_offsets(null_width, schema); + let mut init_capacity = estimate_row_width(null_width, schema); + if !fixed_size(schema) { + // double the capacity to avoid repeated resize + init_capacity *= 2; + } + Self { + data: vec![0; init_capacity], + field_count, + row_width: 0, + null_width, + values_width, + varlena_width: 0, + varlena_offset: null_width + values_width, + field_offsets, + } + } + + /// Reset the row writer state for new tuple + pub fn reset(&mut self) { + self.data.fill(0); + self.row_width = 0; + self.varlena_width = 0; + self.varlena_offset = self.null_width + self.values_width; + } + + #[inline] + fn assert_index_valid(&self, idx: usize) { + assert!(idx < self.field_count); + } + + fn set_null_at(&mut self, idx: usize) { + let null_bits = &mut self.data[0..self.null_width]; + set_bit(null_bits, idx, false) + } + + fn set_non_null_at(&mut self, idx: usize) { + let null_bits = &mut self.data[0..self.null_width]; + set_bit(null_bits, idx, true) + } + + fn set_bool(&mut self, idx: usize, value: bool) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = if value { 1 } else { 0 }; + } + + fn set_u8(&mut self, idx: usize, value: u8) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = value; + } + + fn_set_idx!(u16, 2); + fn_set_idx!(u32, 4); + fn_set_idx!(u64, 8); + fn_set_idx!(i16, 2); + fn_set_idx!(i32, 4); + fn_set_idx!(i64, 8); + fn_set_idx!(f32, 4); + fn_set_idx!(f64, 8); + + fn set_i8(&mut self, idx: usize, value: i8) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = value.to_le_bytes()[0]; + } + + fn set_date32(&mut self, idx: usize, value: i32) { + set_idx!(4, self, idx, value) + } + + fn set_date64(&mut self, idx: usize, value: i64) { + set_idx!(8, self, idx, value) + } + + fn set_offset_size(&mut self, idx: usize, size: usize) { + let offset_and_size: u64 = (self.varlena_offset << 32 | size) as u64; + self.set_u64(idx, offset_and_size); + } + + fn set_utf8(&mut self, idx: usize, value: &str) { + self.assert_index_valid(idx); + let bytes = value.as_bytes(); + let size = bytes.len(); + self.set_offset_size(idx, size); + let varlena_offset = self.varlena_offset; + self.data[varlena_offset..varlena_offset + size].copy_from_slice(bytes); + self.varlena_offset += size; + self.varlena_width += size; + } + + fn set_binary(&mut self, idx: usize, value: &[u8]) { + self.assert_index_valid(idx); + let size = value.len(); + self.set_offset_size(idx, size); + let varlena_offset = self.varlena_offset; + self.data[varlena_offset..varlena_offset + size].copy_from_slice(value); + self.varlena_offset += size; + self.varlena_width += size; + } + + fn current_width(&self) -> usize { + self.null_width + self.values_width + self.varlena_width + } + + /// End each row at 8-byte word boundary. + fn end_padding(&mut self) { + let payload_width = self.current_width(); + self.row_width = (payload_width.saturating_add(7) / 8) * 8; Review comment: `ceil(payload_width, 64) / 8` ########## File path: datafusion/src/row/bitmap/fmt.rs ########## @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Write; + +use super::is_set; + +/// Formats `bytes` taking into account an offset and length of the form +pub fn fmt( + bytes: &[u8], + offset: usize, + length: usize, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + assert!(offset < 8); + + f.write_char('[')?; + let mut remaining = length; + if remaining == 0 { + f.write_char(']')?; + return Ok(()); + } + + let first = bytes[0]; + let bytes = &bytes[1..]; + let empty_before = 8usize.saturating_sub(remaining + offset); + f.write_str("0b")?; + for _ in 0..empty_before { + f.write_char('_')?; + } + let until = std::cmp::min(8, offset + remaining); + for i in offset..until { Review comment: `(until..offset).rev()` might be easier to follow than `offset + until - 1 - i` ########## File path: datafusion/src/row/bitmap/fmt.rs ########## @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Write; + +use super::is_set; + +/// Formats `bytes` taking into account an offset and length of the form +pub fn fmt( + bytes: &[u8], + offset: usize, + length: usize, + f: &mut std::fmt::Formatter<'_>, +) -> std::fmt::Result { + assert!(offset < 8); + + f.write_char('[')?; + let mut remaining = length; + if remaining == 0 { + f.write_char(']')?; + return Ok(()); + } + + let first = bytes[0]; + let bytes = &bytes[1..]; + let empty_before = 8usize.saturating_sub(remaining + offset); + f.write_str("0b")?; + for _ in 0..empty_before { + f.write_char('_')?; + } + let until = std::cmp::min(8, offset + remaining); + for i in offset..until { + if is_set(first, offset + until - 1 - i) { + f.write_char('1')?; + } else { + f.write_char('0')?; + } + } + for _ in 0..offset { + f.write_char('_')?; + } + remaining -= until - offset; + + if remaining == 0 { + f.write_char(']')?; + return Ok(()); + } + + let number_of_bytes = remaining / 8; + for byte in &bytes[..number_of_bytes] { + f.write_str(", ")?; + f.write_fmt(format_args!("{:#010b}", byte))?; + } + remaining -= number_of_bytes * 8; + if remaining == 0 { + f.write_char(']')?; + return Ok(()); + } + + let last = bytes[std::cmp::min((length + offset + 7) / 8, bytes.len() - 1)]; + let remaining = (length + offset) % 8; + f.write_str(", ")?; + f.write_str("0b")?; + for _ in 0..(8 - remaining) { + f.write_char('_')?; + } + for i in 0..remaining { Review comment: `(0..remaining).rev()` maybe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
