This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-columnar-agg-buf in repository https://gitbox.apache.org/repos/asf/auron.git
commit e04788a2b1c9f021efb12836de6703b9ef599df5 Author: zhangli20 <[email protected]> AuthorDate: Wed Jan 21 17:48:44 2026 +0800 Implement columnar aggregate buffers --- native-engine/datafusion-ext-plans/src/agg/acc.rs | 210 +++++++++------------ native-engine/datafusion-ext-plans/src/agg/agg.rs | 1 + .../datafusion-ext-plans/src/agg/agg_ctx.rs | 85 +++++---- .../datafusion-ext-plans/src/agg/agg_table.rs | 12 +- native-engine/datafusion-ext-plans/src/agg/avg.rs | 23 ++- .../datafusion-ext-plans/src/agg/bloom_filter.rs | 43 +++-- .../src/agg/brickhouse/collect.rs | 4 + .../src/agg/brickhouse/combine_unique.rs | 4 + .../datafusion-ext-plans/src/agg/collect.rs | 40 +++- .../datafusion-ext-plans/src/agg/count.rs | 143 +++----------- .../datafusion-ext-plans/src/agg/first.rs | 36 ++-- .../src/agg/first_ignores_null.rs | 18 +- .../datafusion-ext-plans/src/agg/maxmin.rs | 13 +- .../src/agg/spark_udaf_wrapper.rs | 46 +++-- native-engine/datafusion-ext-plans/src/agg/sum.rs | 20 +- .../sql/execution/auron/plan/NativeAggExec.scala | 13 -- .../apache/spark/sql/auron/AuronConverters.scala | 15 +- .../sql/auron/AuronSparkSessionExtension.scala | 2 +- .../org/apache/spark/sql/auron/NativeHelper.scala | 2 +- .../spark/sql/auron/util/TaskContextHelper.scala | 2 +- .../sql/execution/auron/plan/NativeAggBase.scala | 54 +++--- .../NativeParquetInsertIntoHiveTableBase.scala | 7 +- 22 files changed, 375 insertions(+), 418 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs b/native-engine/datafusion-ext-plans/src/agg/acc.rs index d3ce8049..eab4e183 100644 --- a/native-engine/datafusion-ext-plans/src/agg/acc.rs +++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs @@ -15,7 +15,7 @@ use std::{ any::Any, - io::{Cursor, Read, Write}, + io::{Read, Write}, sync::Arc, }; @@ -25,7 +25,7 @@ use arrow::{ }; use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use bitvec::{bitvec, vec::BitVec}; -use byteorder::{ReadBytesExt, WriteBytesExt}; +use byteorder::WriteBytesExt; use datafusion::common::{Result, ScalarValue, utils::proxy::VecAllocExt}; use datafusion_ext_commons::{ SliceAsRawBytes, UninitializedInit, df_execution_err, downcast_any, @@ -43,9 +43,9 @@ pub trait AccColumn: Send { fn shrink_to_fit(&mut self); fn num_records(&self) -> usize; fn mem_used(&self) -> usize; - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()>; - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()>; - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()>; + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>>; + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()>; + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()>; fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()>; fn ensure_size(&mut self, idx: IdxSelection<'_>) { @@ -137,8 +137,7 @@ impl AccBooleanColumn { } } - pub fn to_array(&self, dt: &DataType, idx: IdxSelection<'_>) -> Result<ArrayRef> { - assert!(dt == &DataType::Boolean); + pub fn to_array(&self, idx: IdxSelection<'_>) -> Result<ArrayRef> { idx_with_iter!((idx @ idx) => { Ok(Arc::new(BooleanArray::from_iter( idx.map(|i| self.valids[i].then_some(self.values[i])) @@ -174,38 +173,32 @@ impl AccColumn for AccBooleanColumn { self.num_records() / 4 // 2 bits for each value } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - idx_with_iter!((idx @ idx) => { - for (i, w) in idx.zip(array) { - if self.valids[i] { - w.write_u8(1 + self.values[i] as u8)?; - } else { - w.write_u8(0)?; - } - } - }); - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.to_array(idx)?; + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.resize(0); + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = downcast_any!(&arrays[0], BooleanArray)?; + self.values.clear(); + self.valids.clear(); - for cursor in cursors { - match cursor.read_u8()? { - 0 => { - self.valids.push(false); - self.values.push(false); - } - v => { - self.valids.push(true); - self.values.push(v - 1 != 0); - } + // fill values + for i in 0..array.len() { + self.values.push(array.value(i)); + } + + // fill valids + if let Some(nb) = array.nulls() { + for bit in nb { + self.valids.push(bit); } } + self.valids.resize(self.values.len(), true); Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { let mut buf = vec![]; idx_for! { @@ -240,13 +233,15 @@ impl AccColumn for AccBooleanColumn { pub struct AccPrimColumn<T: ArrowNativeType> { values: Vec<T>, valids: BitVec, + dt: DataType, } impl<T: ArrowNativeType> AccPrimColumn<T> { - pub fn new(num_records: usize) -> Self { + pub fn new(num_records: usize, dt: DataType) -> Self { Self { values: vec![T::default(); num_records], valids: bitvec![0; num_records], + dt, } } @@ -329,39 +324,30 @@ impl<T: ArrowNativeType> AccColumn for AccPrimColumn<T> { self.values.allocated_size() + (self.valids.capacity() + 7) / 8 } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - idx_with_iter!((idx @ idx) => { - for (i, w) in idx.zip(array) { - if self.valids[i] { - w.write_u8(1)?; - w.write_all([self.values[i]].as_raw_bytes())?; - } else { - w.write_u8(0)?; - } - } - }); - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.to_array(&self.dt, idx)?; + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array_data = arrays[0].to_data(); self.resize(0); - let mut value_buf = [T::default()]; - - for cursor in cursors { - let valid = cursor.read_u8()?; - if valid == 1 { - cursor.read_exact(value_buf.as_raw_bytes_mut())?; - self.values.push(value_buf[0]); - self.valids.push(true); - } else { - self.values.push(T::default()); - self.valids.push(false); + + // fill values + let buffer = array_data.buffer::<T>(0); + self.values = buffer[array_data.offset()..][..array_data.len()].to_vec(); + + // fill valids + if let Some(nb) = array_data.nulls() { + for bit in nb { + self.valids.push(bit); } } + self.valids.resize(self.values.len(), true); Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { // write valids let mut bits: BitVec<u8> = BitVec::with_capacity(idx.len()); idx_for! { @@ -411,13 +397,15 @@ impl<T: ArrowNativeType> AccColumn for AccPrimColumn<T> { pub struct AccBytesColumn { items: Vec<Option<AccBytes>>, heap_mem_used: usize, + dt: DataType, } impl AccBytesColumn { - pub fn new(num_records: usize) -> Self { + pub fn new(num_records: usize, dt: DataType) -> Self { Self { items: vec![None; num_records], heap_mem_used: 0, + dt, } } @@ -436,13 +424,13 @@ impl AccBytesColumn { self.heap_mem_used += self.item_heap_mem_used(idx); } - fn to_array(&self, dt: &DataType, idx: IdxSelection<'_>) -> Result<ArrayRef> { + fn to_array(&self, idx: IdxSelection<'_>) -> Result<ArrayRef> { let binary; idx_with_iter!((idx @ idx) => { binary = BinaryArray::from_iter(idx.map(|i| self.items[i].as_ref())); }); - match dt { + match &self.dt { DataType::Utf8 => Ok(make_array( binary .to_data() @@ -451,7 +439,7 @@ impl AccBytesColumn { .build()?, )), DataType::Binary => Ok(Arc::new(binary)), - _ => df_execution_err!("expected string or binary type, got {dt:?}"), + dt => df_execution_err!("expected string or binary type, got {dt:?}"), } } @@ -532,25 +520,34 @@ impl AccColumn for AccBytesColumn { self.heap_mem_used + self.items.allocated_size() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - idx_with_iter!((idx @ idx) => { - for (i, w) in idx.zip(array) { - self.save_value(i, w)?; - } - }); - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.to_array(idx)?; + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.items.resize(0, Default::default()); - for cursor in cursors { - self.load_value(cursor)?; + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array_data = arrays[0].to_data(); + self.items.clear(); + + // fill values + let data_buffer = array_data.buffer::<u8>(0); + let offset_buffer = array_data.buffer::<i32>(1); + for i in 0..array_data.len() { + if array_data.is_valid(i) { + let offset_begin = offset_buffer[array_data.offset() + i] as usize; + let offset_end = offset_buffer[array_data.offset() + i + 1] as usize; + self.items.push(Some(AccBytes::from_slice( + &data_buffer[offset_begin..offset_end], + ))); + } else { + self.items.push(None); + } } self.refresh_heap_mem_used(); Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { self.save_value(idx, w)?; @@ -576,17 +573,17 @@ pub struct AccScalarValueColumn { } impl AccScalarValueColumn { - pub fn new(dt: &DataType, num_rows: usize) -> Self { - let null_value = ScalarValue::try_from(dt).expect("unsupported data type"); + pub fn new(dt: DataType, num_rows: usize) -> Self { + let null_value = ScalarValue::try_from(&dt).expect("unsupported data type"); Self { items: (0..num_rows).map(|_| null_value.clone()).collect(), - dt: dt.clone(), + dt, null_value, heap_mem_used: 0, } } - pub fn to_array(&mut self, _dt: &DataType, idx: IdxSelection<'_>) -> Result<ArrayRef> { + pub fn to_array(&mut self, idx: IdxSelection<'_>) -> Result<ArrayRef> { idx_with_iter!((idx @ idx) => { ScalarValue::iter_to_array(idx.map(|i| { std::mem::replace(&mut self.items[i], self.null_value.clone()) @@ -642,38 +639,36 @@ impl AccColumn for AccScalarValueColumn { self.heap_mem_used + self.items.allocated_size() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - idx_with_iter!((idx @ idx) => { - for (i, w) in idx.zip(array) { - write_scalar(&self.items[i], true, w)?; - } - }); - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.to_array(idx)?; // data type is not used + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.items.truncate(0); + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = &arrays[0]; + + self.items.clear(); self.heap_mem_used = 0; - for cursor in cursors { - let scalar = read_scalar(cursor, &self.dt, true)?; + for i in 0..array.len() { + let scalar = ScalarValue::try_from_array(array, i)?; self.heap_mem_used += scalar_value_heap_mem_size(&scalar); - self.items.push(scalar); } Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { - write_scalar(&self.items[idx], true, w)?; + let scalar = self.take_value(idx); + write_scalar(&scalar, true, w)?; } } Ok(()) } fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - self.items.truncate(0); + self.items.clear(); self.heap_mem_used = 0; for _ in 0..num_rows { @@ -685,43 +680,18 @@ impl AccColumn for AccScalarValueColumn { } } -pub fn create_acc_generic_column(dt: &DataType, num_rows: usize) -> AccColumnRef { +pub fn create_acc_generic_column(dt: DataType, num_rows: usize) -> AccColumnRef { macro_rules! primitive_helper { ($t:ty) => { Box::new(AccPrimColumn::<<$t as ArrowPrimitiveType>::Native>::new( - num_rows, + num_rows, dt, )) }; } downcast_primitive! { dt => (primitive_helper), DataType::Boolean => Box::new(AccBooleanColumn::new(num_rows)), - DataType::Utf8 | DataType::Binary => Box::new(AccBytesColumn::new(num_rows)), + DataType::Utf8 | DataType::Binary => Box::new(AccBytesColumn::new(num_rows, dt)), other => Box::new(AccScalarValueColumn::new(other, num_rows)), } } - -pub fn acc_generic_column_to_array( - column: &mut AccColumnRef, - dt: &DataType, - idx: IdxSelection<'_>, -) -> Result<ArrayRef> { - macro_rules! primitive_helper { - ($t:ty) => { - downcast_any!(column, mut AccPrimColumn::<<$t as ArrowPrimitiveType>::Native>)? - .to_array(dt, idx) - }; - } - downcast_primitive! { - dt => (primitive_helper), - DataType::Boolean => { - downcast_any!(column, mut AccBooleanColumn)?.to_array(dt, idx) - } - DataType::Utf8 | DataType::Binary => { - downcast_any!(column, mut AccBytesColumn)?.to_array(dt, idx) - } - _other => { - downcast_any!(column, mut AccScalarValueColumn)?.to_array(dt, idx) - } - } -} diff --git a/native-engine/datafusion-ext-plans/src/agg/agg.rs b/native-engine/datafusion-ext-plans/src/agg/agg.rs index b43980ed..5eb4c3da 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg.rs @@ -44,6 +44,7 @@ pub trait Agg: Send + Sync + Debug { fn data_type(&self) -> &DataType; fn nullable(&self) -> bool; fn create_acc_column(&self, num_rows: usize) -> AccColumnRef; + fn acc_array_data_types(&self) -> &[DataType]; fn with_new_exprs(&self, exprs: Vec<PhysicalExprRef>) -> Result<Arc<dyn Agg>>; fn prepare_partial_args(&self, partial_inputs: &[ArrayRef]) -> Result<Vec<ArrayRef>> { diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs b/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs index 2794684f..d36876eb 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs @@ -15,13 +15,12 @@ use std::{ fmt::{Debug, Formatter}, - io::Cursor, sync::Arc, }; use arrow::{ - array::{ArrayRef, BinaryArray, RecordBatchOptions}, - datatypes::{DataType, Field, Fields, Schema, SchemaRef}, + array::{Array, ArrayRef, RecordBatchOptions}, + datatypes::{Field, Fields, Schema, SchemaRef}, record_batch::RecordBatch, row::{RowConverter, Rows, SortField}, }; @@ -29,20 +28,16 @@ use auron_jni_bridge::{ conf, conf::{BooleanConf, DoubleConf, IntConf}, }; -use datafusion::{ - common::{Result, cast::as_binary_array}, - physical_expr::PhysicalExprRef, -}; +use datafusion::{common::Result, physical_expr::PhysicalExprRef}; use datafusion_ext_commons::{downcast_any, suggested_batch_mem_size}; use once_cell::sync::OnceCell; use parking_lot::Mutex; use crate::{ agg::{ - AGG_BUF_COLUMN_NAME, AggExecMode, AggExpr, AggMode, GroupingExpr, + AggExecMode, AggExpr, AggMode, GroupingExpr, acc::AccTable, agg::{Agg, IdxSelection}, - agg_hash_map::AggHashMapKey, spark_udaf_wrapper::{AccUDAFBufferRowsColumn, SparkUDAFMemTracker, SparkUDAFWrapper}, }, common::{ @@ -63,6 +58,8 @@ pub struct AggContext { pub grouping_row_converter: Arc<Mutex<RowConverter>>, pub groupings: Vec<GroupingExpr>, pub aggs: Vec<AggExpr>, + pub input_acc_arrays_len: usize, + pub output_acc_arrays_len: usize, pub supports_partial_skipping: bool, pub partial_skipping_ratio: f64, pub partial_skipping_min_rows: usize, @@ -137,7 +134,11 @@ impl AggContext { )); } } else { - agg_fields.push(Field::new(AGG_BUF_COLUMN_NAME, DataType::Binary, false)); + for agg in &aggs { + for dt in agg.agg.acc_array_data_types() { + agg_fields.push(Field::new("", dt.clone(), true)); + } + } } let agg_schema = Arc::new(Schema::new(agg_fields)); let output_schema = Arc::new(Schema::new( @@ -148,6 +149,17 @@ impl AggContext { .concat(), )); + let input_acc_arrays_len = aggs + .iter() + .filter(|agg| agg.mode.is_partial_merge() || agg.mode.is_final()) + .map(|agg| agg.agg.acc_array_data_types().len()) + .sum(); + let output_acc_arrays_len = aggs + .iter() + .filter(|agg| agg.mode.is_partial() || agg.mode.is_partial_merge()) + .map(|agg| agg.agg.acc_array_data_types().len()) + .sum(); + let agg_exprs_flatten: Vec<PhysicalExprRef> = aggs .iter() .filter(|agg| agg.mode.is_partial()) @@ -195,6 +207,8 @@ impl AggContext { grouping_row_converter, groupings, aggs, + input_acc_arrays_len, + output_acc_arrays_len, agg_expr_evaluator, supports_partial_skipping, partial_skipping_ratio, @@ -277,22 +291,15 @@ impl AggContext { let mut merging_acc_table = self.create_acc_table(0); if self.need_partial_merge { - let partial_merged_array = - as_binary_array(batch.columns().last().expect("last column"))?; - let array = partial_merged_array - .iter() - .skip(batch_start_idx) - .take(batch_end_idx - batch_start_idx) - .map(|bytes| bytes.expect("non-null bytes")) - .collect::<Vec<_>>(); - let mut cursors = array - .iter() - .map(|bytes| Cursor::new(bytes.as_bytes())) - .collect::<Vec<_>>(); - - for (agg_idx, _agg) in &self.need_partial_merge_aggs { - let acc_col = &mut merging_acc_table.cols_mut()[*agg_idx]; - acc_col.unfreeze_from_rows(&mut cursors)?; + let mut acc_arrays_start = batch.num_columns() - self.input_acc_arrays_len; + for (agg_idx, agg) in &self.need_partial_merge_aggs { + let acc_arrays = &batch.columns()[acc_arrays_start..] + .iter() + .take(agg.acc_array_data_types().len()) + .map(|array| array.slice(batch_start_idx, batch_end_idx - batch_start_idx)) + .collect::<Vec<_>>(); + acc_arrays_start += agg.acc_array_data_types().len(); + merging_acc_table.cols_mut()[*agg_idx].unfreeze_from_arrays(&acc_arrays)?; } } let batch_selection = IdxSelection::Range(0, batch_end_idx - batch_start_idx); @@ -320,9 +327,7 @@ impl AggContext { } Ok(agg_columns) } else { - // output acc as a binary column - let freezed = self.freeze_acc_table(acc_table, idx)?; - Ok(vec![Arc::new(BinaryArray::from_iter_values(freezed))]) + self.freeze_acc_table(acc_table, idx) } } @@ -407,23 +412,23 @@ impl AggContext { pub fn freeze_acc_table( &self, - acc_table: &AccTable, + acc_table: &mut AccTable, acc_idx: IdxSelection, - ) -> Result<Vec<Vec<u8>>> { + ) -> Result<Vec<ArrayRef>> { let udaf_indices_cache = OnceCell::new(); - let mut vec = vec![vec![]; acc_idx.len()]; - for acc_col in acc_table.cols() { + let mut arrays = vec![]; + + for acc_col in acc_table.cols_mut() { if let Ok(udaf_acc_col) = downcast_any!(acc_col, AccUDAFBufferRowsColumn) { - udaf_acc_col.freeze_to_rows_with_indices_cache( - acc_idx, - &mut vec, - &udaf_indices_cache, - )?; + arrays.push( + udaf_acc_col + .freeze_to_array_with_indices_cache(acc_idx, &udaf_indices_cache)?, + ); } else { - acc_col.freeze_to_rows(acc_idx, &mut vec)?; + arrays.extend(acc_col.freeze_to_arrays(acc_idx)?); } } - Ok(vec) + Ok(arrays) } pub async fn process_partial_skipped( diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index c432c892..fc3a6617 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -544,7 +544,7 @@ impl HashingData { // sort all records using radix sort on hashcodes of keys let num_spill_buckets = self.agg_ctx.num_spill_buckets(self.mem_used()); let key_rows = self.map.into_keys(); - let acc_table = self.acc_table; + let mut acc_table = self.acc_table; let mut entries = key_rows .iter() .enumerate() @@ -568,7 +568,7 @@ impl HashingData { write_spill_bucket( &mut writer, &self.agg_ctx, - &acc_table, + &mut acc_table, entries[offset..][..cur_bucket_count] .iter() .map(|&(_, record_idx)| &key_rows[record_idx as usize]), @@ -674,7 +674,7 @@ impl MergingData { entries.shrink_to_fit(); let key_rows = self.key_rows; - let acc_table = self.acc_table; + let mut acc_table = self.acc_table; let mut bucket_counts = vec![0; num_spill_buckets]; radix_sort_by_key(&mut entries, &mut bucket_counts, |(bucket_id, ..)| { @@ -693,7 +693,7 @@ impl MergingData { write_spill_bucket( &mut writer, &self.agg_ctx, - &acc_table, + &mut acc_table, entries[offset..][..cur_bucket_count].iter().map( |&(_, batch_idx, row_idx, _)| { key_rows[batch_idx as usize] @@ -722,7 +722,7 @@ impl MergingData { fn write_spill_bucket( w: &mut SpillCompressedWriter, agg_ctx: &AggContext, - acc_table: &AccTable, + acc_table: &mut AccTable, key_iter: impl Iterator<Item = impl AsRef<[u8]>>, acc_idx_iter: impl Iterator<Item = usize>, spill_idx: usize, @@ -730,7 +730,7 @@ fn write_spill_bucket( // write accs let udaf_indices_cache = OnceCell::new(); let acc_indices: Vec<usize> = acc_idx_iter.collect(); - for col in acc_table.cols() { + for col in acc_table.cols_mut() { if let Ok(udaf_col) = downcast_any!(col, AccUDAFBufferRowsColumn) { udaf_col.spill_with_indices_cache( IdxSelection::Indices(&acc_indices), diff --git a/native-engine/datafusion-ext-plans/src/agg/avg.rs b/native-engine/datafusion-ext-plans/src/agg/avg.rs index 008dd34b..47c5874b 100644 --- a/native-engine/datafusion-ext-plans/src/agg/avg.rs +++ b/native-engine/datafusion-ext-plans/src/agg/avg.rs @@ -16,7 +16,6 @@ use std::{ any::Any, fmt::{Debug, Formatter}, - io::Cursor, sync::Arc, }; @@ -44,6 +43,7 @@ pub struct AggAvg { data_type: DataType, agg_sum: AggSum, agg_count: AggCount, + acc_array_data_types: Vec<DataType>, } impl AggAvg { @@ -51,6 +51,7 @@ impl AggAvg { let agg_sum = AggSum::try_new(child.clone(), data_type.clone())?; let agg_count = AggCount::try_new(vec![child.clone()], DataType::Int64)?; Ok(Self { + acc_array_data_types: vec![data_type.clone(), DataType::Int64], child, data_type, agg_sum, @@ -104,6 +105,10 @@ impl Agg for AggAvg { }) } + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types + } + fn partial_update( &self, accs: &mut AccColumnRef, @@ -206,19 +211,19 @@ impl AccColumn for AccAvgColumn { self.sum.mem_used() + self.count.mem_used() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - self.sum.freeze_to_rows(idx, array)?; - self.count.freeze_to_rows(idx, array)?; - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let sum_array = self.sum.freeze_to_arrays(idx)?[0].clone(); + let count_array = self.count.freeze_to_arrays(idx)?[0].clone(); + Ok(vec![sum_array, count_array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.sum.unfreeze_from_rows(cursors)?; - self.count.unfreeze_from_rows(cursors)?; + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + self.sum.unfreeze_from_arrays(&arrays[0..1])?; + self.count.unfreeze_from_arrays(&arrays[1..2])?; Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, buf: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, buf: &mut SpillCompressedWriter) -> Result<()> { self.sum.spill(idx, buf)?; self.count.spill(idx, buf)?; Ok(()) diff --git a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs index 6aff3c84..4688e131 100644 --- a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs +++ b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs @@ -21,7 +21,7 @@ use std::{ }; use arrow::{ - array::{ArrayRef, AsArray, BinaryBuilder}, + array::{ArrayRef, AsArray, BinaryArray, BinaryBuilder}, datatypes::{DataType, Int64Type}, }; use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; @@ -108,6 +108,10 @@ impl Agg for AggBloomFilter { bloom_filters } + fn acc_array_data_types(&self) -> &[DataType] { + &[DataType::Binary] + } + fn partial_update( &self, accs: &mut AccColumnRef, @@ -245,36 +249,41 @@ impl AccColumn for AccBloomFilterColumn { .sum() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let mut array = vec![]; + idx_for! { (idx in idx) => { - let w = &mut array[idx]; if let Some(bloom_filter) = &self.bloom_filters[idx] { + let mut w = vec![]; w.write_u8(1)?; - bloom_filter.write_to(w)?; + bloom_filter.write_to(&mut w)?; + array.push(Some(w)); } else { - w.write_u8(0)?; + array.push(None); } } } - Ok(()) + + let array = Arc::new(BinaryArray::from_iter(array)); + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - assert_eq!(self.num_records(), 0, "expect empty AccColumn"); - for r in cursors { - self.bloom_filters.push({ - if r.read_u8()? == 1 { - Some(SparkBloomFilter::read_from(r)?) - } else { - None - } - }); + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = downcast_any!(arrays[0], BinaryArray)?; + + for v in array { + if let Some(w) = v { + self.bloom_filters + .push(Some(SparkBloomFilter::read_from(&mut Cursor::new(w))?)); + } else { + self.bloom_filters.push(None); + } } Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { if let Some(bloom_filter) = &self.bloom_filters[idx] { diff --git a/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs b/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs index 8d982b48..f7aeb69e 100644 --- a/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs @@ -81,6 +81,10 @@ impl Agg for AggCollect { self.innert_collect_list.create_acc_column(num_rows) } + fn acc_array_data_types(&self) -> &[DataType] { + self.innert_collect_list.acc_array_data_types() + } + fn partial_update( &self, accs: &mut AccColumnRef, diff --git a/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs b/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs index 4e7144c6..4330e1cd 100644 --- a/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs +++ b/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs @@ -81,6 +81,10 @@ impl Agg for AggCombineUnique { self.inner_collect_set.create_acc_column(num_rows) } + fn acc_array_data_types(&self) -> &[DataType] { + self.inner_collect_set.acc_array_data_types() + } + fn partial_update( &self, accs: &mut AccColumnRef, diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs b/native-engine/datafusion-ext-plans/src/agg/collect.rs index 225d4303..b20acafd 100644 --- a/native-engine/datafusion-ext-plans/src/agg/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs @@ -116,6 +116,10 @@ impl<C: AccCollectionColumn> Agg for AggGenericCollect<C> { col } + fn acc_array_data_types(&self) -> &[DataType] { + &[DataType::Binary] + } + fn partial_update( &self, accs: &mut AccColumnRef, @@ -292,15 +296,23 @@ impl AccColumn for AccSetColumn { self.mem_used + self.set.capacity() * size_of::<AccSet>() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - AccCollectionColumn::freeze_to_rows(self, idx, array) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let mut array = vec![vec![]; idx.len()]; + AccCollectionColumn::freeze_to_rows(self, idx, &mut array)?; + Ok(vec![Arc::new(BinaryArray::from_iter_values(array))]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - AccCollectionColumn::unfreeze_from_rows(self, cursors) + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = downcast_any!(arrays[0], BinaryArray)?; + let mut cursors = vec![]; + + for i in 0..array.len() { + cursors.push(Cursor::new(array.value(i))); + } + AccCollectionColumn::unfreeze_from_rows(self, &mut cursors) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { self.save_raw(idx, w)?; @@ -404,15 +416,23 @@ impl AccColumn for AccListColumn { self.mem_used + self.list.capacity() * size_of::<AccList>() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - AccCollectionColumn::freeze_to_rows(self, idx, array) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let mut array = vec![vec![]; idx.len()]; + AccCollectionColumn::freeze_to_rows(self, idx, &mut array)?; + Ok(vec![Arc::new(BinaryArray::from_iter_values(array))]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - AccCollectionColumn::unfreeze_from_rows(self, cursors) + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = downcast_any!(arrays[0], BinaryArray)?; + let mut cursors = vec![]; + + for i in 0..array.len() { + cursors.push(Cursor::new(array.value(i))); + } + AccCollectionColumn::unfreeze_from_rows(self, &mut cursors) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { self.save_raw(idx, w)?; diff --git a/native-engine/datafusion-ext-plans/src/agg/count.rs b/native-engine/datafusion-ext-plans/src/agg/count.rs index ab17c5d3..1b5a47f4 100644 --- a/native-engine/datafusion-ext-plans/src/agg/count.rs +++ b/native-engine/datafusion-ext-plans/src/agg/count.rs @@ -16,24 +16,19 @@ use std::{ any::Any, fmt::{Debug, Formatter}, - io::Cursor, sync::Arc, }; use arrow::{array::*, datatypes::*}; -use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}; use datafusion::{common::Result, physical_expr::PhysicalExprRef}; -use datafusion_ext_commons::{ - downcast_any, - io::{read_len, write_len}, -}; +use datafusion_ext_commons::downcast_any; use crate::{ agg::{ - acc::{AccColumn, AccColumnRef}, + acc::{AccColumn, AccColumnRef, AccPrimColumn}, agg::{Agg, IdxSelection}, }, - idx_for, idx_for_zipped, idx_with_iter, + idx_for_zipped, }; pub struct AggCount { @@ -82,9 +77,11 @@ impl Agg for AggCount { } fn create_acc_column(&self, num_rows: usize) -> Box<dyn AccColumn> { - Box::new(AccCountColumn { - values: vec![0; num_rows], - }) + Box::new(AccPrimColumn::<i64>::new(num_rows, DataType::Int64)) + } + + fn acc_array_data_types(&self) -> &[DataType] { + &[DataType::Int64] } fn partial_update( @@ -94,32 +91,15 @@ impl Agg for AggCount { partial_args: &[ArrayRef], partial_arg_idx: IdxSelection<'_>, ) -> Result<()> { - let accs = downcast_any!(accs, mut AccCountColumn)?; + let accs = downcast_any!(accs, mut AccPrimColumn<i64>)?; accs.ensure_size(acc_idx); - if partial_args.is_empty() { - idx_for_zipped! { - ((acc_idx, _partial_arg_idx) in (acc_idx, partial_arg_idx)) => { - if acc_idx >= accs.values.len() { - accs.values.push(1); - } else { - accs.values[acc_idx] += 1; - } - } - } - } else { - idx_for_zipped! { - ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { - let add = partial_args - .iter() - .all(|arg| arg.is_valid(partial_arg_idx)) as i64; - - if acc_idx >= accs.values.len() { - accs.values.push(add); - } else { - accs.values[acc_idx] += add; - } - } + idx_for_zipped! { + ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { + let add = partial_args + .iter() + .all(|arg| arg.is_valid(partial_arg_idx)) as i64; + accs.set_value(acc_idx, Some(accs.value(acc_idx).unwrap_or(0) + add)); } } Ok(()) @@ -132,98 +112,25 @@ impl Agg for AggCount { merging_accs: &mut AccColumnRef, merging_acc_idx: IdxSelection<'_>, ) -> Result<()> { - let accs = downcast_any!(accs, mut AccCountColumn)?; - let merging_accs = downcast_any!(merging_accs, mut AccCountColumn)?; + let accs = downcast_any!(accs, mut AccPrimColumn<i64>)?; + let merging_accs = downcast_any!(merging_accs, mut AccPrimColumn<i64>)?; accs.ensure_size(acc_idx); idx_for_zipped! { ((acc_idx, merging_acc_idx) in (acc_idx, merging_acc_idx)) => { - if acc_idx < accs.values.len() { - accs.values[acc_idx] += merging_accs.values[merging_acc_idx]; - } else { - accs.values.push(merging_accs.values[merging_acc_idx]); - } + let v = match (accs.value(acc_idx), merging_accs.value(merging_acc_idx)) { + (Some(a), Some(b)) => Some(a + b), + (Some(a), _) => Some(a), + (_, Some(b)) => Some(b), + _ => Some(0), + }; + accs.set_value(acc_idx, v); } } Ok(()) } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - let accs = downcast_any!(accs, mut AccCountColumn)?; - - idx_with_iter! { - (acc_idx_iter @ acc_idx) => { - Ok(Arc::new(Int64Array::from_iter_values( - acc_idx_iter.map(|idx| accs.values[idx]) - ))) - } - } - } -} - -pub struct AccCountColumn { - pub values: Vec<i64>, -} - -impl AccColumn for AccCountColumn { - fn as_any(&self) -> &dyn Any { - self - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } - - fn resize(&mut self, num_accs: usize) { - self.values.resize(num_accs, 0); - } - - fn shrink_to_fit(&mut self) { - self.values.shrink_to_fit(); - } - - fn num_records(&self) -> usize { - self.values.len() - } - - fn mem_used(&self) -> usize { - self.values.capacity() * 2 * size_of::<i64>() - } - - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - let mut array_idx = 0; - - idx_for! { - (idx in idx) => { - write_len(self.values[idx] as usize, &mut array[array_idx])?; - array_idx += 1; - } - } - Ok(()) - } - - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - assert_eq!(self.num_records(), 0, "expect empty AccColumn"); - for cursor in cursors { - self.values.push(read_len(cursor)? as i64); - } - Ok(()) - } - - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { - idx_for! { - (idx in idx) => { - write_len(self.values[idx] as usize, w)?; - } - } - Ok(()) - } - - fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - assert_eq!(self.num_records(), 0, "expect empty AccColumn"); - for _ in 0..num_rows { - self.values.push(read_len(r)? as i64); - } - Ok(()) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } diff --git a/native-engine/datafusion-ext-plans/src/agg/first.rs b/native-engine/datafusion-ext-plans/src/agg/first.rs index b4bbc115..560cc5d3 100644 --- a/native-engine/datafusion-ext-plans/src/agg/first.rs +++ b/native-engine/datafusion-ext-plans/src/agg/first.rs @@ -16,7 +16,6 @@ use std::{ any::Any, fmt::{Debug, Formatter}, - io::Cursor, sync::Arc, }; @@ -33,7 +32,7 @@ use crate::{ Agg, acc::{ AccBooleanColumn, AccBytes, AccBytesColumn, AccColumn, AccColumnRef, AccPrimColumn, - AccScalarValueColumn, acc_generic_column_to_array, create_acc_generic_column, + AccScalarValueColumn, create_acc_generic_column, }, agg::IdxSelection, }, @@ -43,11 +42,17 @@ use crate::{ pub struct AggFirst { child: PhysicalExprRef, data_type: DataType, + acc_array_data_types: Vec<DataType>, } impl AggFirst { pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> Result<Self> { - Ok(Self { child, data_type }) + let acc_array_data_types = vec![data_type.clone(), DataType::Boolean]; + Ok(Self { + child, + data_type, + acc_array_data_types, + }) } } @@ -83,11 +88,15 @@ impl Agg for AggFirst { fn create_acc_column(&self, num_rows: usize) -> AccColumnRef { Box::new(AccFirstColumn { - values: create_acc_generic_column(&self.data_type, num_rows), + values: create_acc_generic_column(self.data_type.clone(), num_rows), flags: AccBooleanColumn::new(num_rows), }) } + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types + } + fn partial_update( &self, accs: &mut AccColumnRef, @@ -267,8 +276,7 @@ impl Agg for AggFirst { } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - let accs = downcast_any!(accs, mut AccFirstColumn)?; - acc_generic_column_to_array(&mut accs.values, &self.data_type, acc_idx) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } @@ -312,19 +320,19 @@ impl AccColumn for AccFirstColumn { self.values.mem_used() + self.flags.mem_used() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - self.values.freeze_to_rows(idx, array)?; - self.flags.freeze_to_rows(idx, array)?; - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let value_array = self.values.freeze_to_arrays(idx)?[0].clone(); + let flags_array = self.flags.freeze_to_arrays(idx)?[0].clone(); + Ok(vec![value_array, flags_array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.values.unfreeze_from_rows(cursors)?; - self.flags.unfreeze_from_rows(cursors)?; + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + self.values.unfreeze_from_arrays(&arrays[0..1])?; + self.flags.unfreeze_from_arrays(&arrays[1..2])?; Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { self.values.spill(idx, w)?; self.flags.spill(idx, w)?; Ok(()) diff --git a/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs b/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs index 03b2ddec..b0c79c0e 100644 --- a/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs +++ b/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs @@ -28,7 +28,7 @@ use crate::{ Agg, acc::{ AccBooleanColumn, AccBytes, AccBytesColumn, AccColumnRef, AccPrimColumn, - AccScalarValueColumn, acc_generic_column_to_array, create_acc_generic_column, + AccScalarValueColumn, create_acc_generic_column, }, agg::IdxSelection, }, @@ -38,11 +38,17 @@ use crate::{ pub struct AggFirstIgnoresNull { child: PhysicalExprRef, data_type: DataType, + acc_array_data_types: Vec<DataType>, } impl AggFirstIgnoresNull { pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> Result<Self> { - Ok(Self { child, data_type }) + let acc_array_data_types = vec![data_type.clone()]; + Ok(Self { + child, + data_type, + acc_array_data_types, + }) } } @@ -77,7 +83,11 @@ impl Agg for AggFirstIgnoresNull { } fn create_acc_column(&self, num_rows: usize) -> AccColumnRef { - create_acc_generic_column(&self.data_type, num_rows) + create_acc_generic_column(self.data_type.clone(), num_rows) + } + + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types } fn partial_update( @@ -217,6 +227,6 @@ impl Agg for AggFirstIgnoresNull { } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - acc_generic_column_to_array(accs, &self.data_type, acc_idx) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } diff --git a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs index ea726067..c9cb564c 100644 --- a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs +++ b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs @@ -30,7 +30,7 @@ use crate::{ Agg, acc::{ AccBooleanColumn, AccBytes, AccBytesColumn, AccColumn, AccColumnRef, AccPrimColumn, - AccScalarValueColumn, acc_generic_column_to_array, create_acc_generic_column, + AccScalarValueColumn, create_acc_generic_column, }, agg::IdxSelection, }, @@ -43,14 +43,17 @@ pub type AggMin = AggMaxMin<AggMinParams>; pub struct AggMaxMin<P: AggMaxMinParams> { child: PhysicalExprRef, data_type: DataType, + acc_array_data_types: Vec<DataType>, _phantom: PhantomData<P>, } impl<P: AggMaxMinParams> AggMaxMin<P> { pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> Result<Self> { + let acc_array_data_types = vec![data_type.clone()]; Ok(Self { child, data_type, + acc_array_data_types, _phantom: Default::default(), }) } @@ -87,7 +90,11 @@ impl<P: AggMaxMinParams> Agg for AggMaxMin<P> { } fn create_acc_column(&self, num_rows: usize) -> AccColumnRef { - create_acc_generic_column(&self.data_type, num_rows) + create_acc_generic_column(self.data_type.clone(), num_rows) + } + + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types } fn partial_update( @@ -286,7 +293,7 @@ impl<P: AggMaxMinParams> Agg for AggMaxMin<P> { } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - acc_generic_column_to_array(accs, &self.data_type, acc_idx) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } diff --git a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs index 15dd82bf..55ecd7cd 100644 --- a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs +++ b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs @@ -21,7 +21,9 @@ use std::{ }; use arrow::{ - array::{Array, ArrayRef, StructArray, as_struct_array, make_array}, + array::{ + Array, ArrayRef, BinaryArray, BinaryBuilder, StructArray, as_struct_array, make_array, + }, datatypes::{DataType, Field, Schema, SchemaRef}, ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi}, record_batch::{RecordBatch, RecordBatchOptions}, @@ -238,6 +240,10 @@ impl Agg for SparkUDAFWrapper { Box::new(AccUDAFBufferRowsColumn { obj, jcontext }) } + fn acc_array_data_types(&self) -> &[DataType] { + &[DataType::Binary] + } + fn with_new_exprs(&self, _exprs: Vec<PhysicalExprRef>) -> Result<Arc<dyn Agg>> { Ok(Arc::new(Self::try_new( self.serialized.clone(), @@ -289,12 +295,11 @@ pub struct AccUDAFBufferRowsColumn { } impl AccUDAFBufferRowsColumn { - pub fn freeze_to_rows_with_indices_cache( + pub fn freeze_to_array_with_indices_cache( &self, idx: IdxSelection<'_>, - array: &mut [Vec<u8>], cache: &OnceCell<LocalRef>, - ) -> Result<()> { + ) -> Result<ArrayRef> { let idx_array = cache.get_or_try_init(move || jni_new_prim_array!(int, &idx.to_int32_vec()[..]))?; let serialized = jni_call!( @@ -307,15 +312,18 @@ impl AccUDAFBufferRowsColumn { jni_get_byte_array_region!(serialized.as_obj(), 0, &mut serialized_bytes[..])?; // UnsafeRow is serialized with big-endian i32 length prefix - let mut cursor = Cursor::new(&serialized_bytes); - for i in 0..array.len() { - let mut bytes_len_buf = [0; 4]; - cursor.read_exact(&mut bytes_len_buf)?; + let mut serialized_pos = 0; + let mut binary_builder = BinaryBuilder::with_capacity(idx.len(), 0); + for i in 0..idx.len() { + let mut bytes_len_buf = [0u8; 4]; + bytes_len_buf.copy_from_slice(&serialized_bytes[serialized_pos..][..4]); let bytes_len = i32::from_be_bytes(bytes_len_buf) as usize; - write_len(bytes_len, &mut array[i])?; - std::io::copy(&mut (&mut cursor).take(bytes_len as u64), &mut array[i])?; + serialized_pos += 4; + + binary_builder.append_value(&serialized_bytes[serialized_pos..][..bytes_len]); + serialized_pos += bytes_len; } - Ok(()) + Ok(Arc::new(binary_builder.finish())) } pub fn spill_with_indices_cache( @@ -389,12 +397,20 @@ impl AccColumn for AccUDAFBufferRowsColumn { 0 // memory is managed in jvm side } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - self.freeze_to_rows_with_indices_cache(idx, array, &OnceCell::new()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.freeze_to_array_with_indices_cache(idx, &OnceCell::new())?; + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { assert_eq!(self.num_records(), 0, "expect empty AccColumn"); + let array = downcast_any!(arrays[0], BinaryArray)?; + + let mut cursors = vec![]; + for i in 0..array.len() { + cursors.push(Cursor::new(array.value(i))); + } + let mut data = vec![]; for cursor in cursors.iter_mut() { let bytes_len = read_len(cursor)?; @@ -414,7 +430,7 @@ impl AccColumn for AccUDAFBufferRowsColumn { Ok(()) } - fn spill(&self, _idx: IdxSelection<'_>, _buf: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, _idx: IdxSelection<'_>, _buf: &mut SpillCompressedWriter) -> Result<()> { unimplemented!("should call spill_with_indices_cache instead") } diff --git a/native-engine/datafusion-ext-plans/src/agg/sum.rs b/native-engine/datafusion-ext-plans/src/agg/sum.rs index 7bae0e11..0b60107a 100644 --- a/native-engine/datafusion-ext-plans/src/agg/sum.rs +++ b/native-engine/datafusion-ext-plans/src/agg/sum.rs @@ -26,9 +26,7 @@ use datafusion_ext_commons::{df_unimplemented_err, downcast_any}; use crate::{ agg::{ Agg, - acc::{ - AccColumnRef, AccPrimColumn, acc_generic_column_to_array, create_acc_generic_column, - }, + acc::{AccColumnRef, AccPrimColumn, create_acc_generic_column}, agg::IdxSelection, }, idx_for_zipped, @@ -37,11 +35,17 @@ use crate::{ pub struct AggSum { child: PhysicalExprRef, data_type: DataType, + acc_array_data_types: Vec<DataType>, } impl AggSum { pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> Result<Self> { - Ok(Self { child, data_type }) + let acc_array_data_types = vec![data_type.clone()]; + Ok(Self { + child, + data_type, + acc_array_data_types, + }) } } @@ -84,7 +88,11 @@ impl Agg for AggSum { } fn create_acc_column(&self, num_rows: usize) -> AccColumnRef { - create_acc_generic_column(&self.data_type, num_rows) + create_acc_generic_column(self.data_type.clone(), num_rows) + } + + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types } fn partial_update( @@ -145,6 +153,6 @@ impl Agg for AggSum { } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - acc_generic_column_to_array(accs, &self.data_type, acc_idx) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala index 7e4b9d6f..99f9247b 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala @@ -17,16 +17,12 @@ package org.apache.spark.sql.execution.auron.plan import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.ExprId import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.expressions.aggregate.Final import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.aggregate.BaseAggregateExec import org.apache.spark.sql.execution.auron.plan.NativeAggBase.AggExecMode -import org.apache.spark.sql.types.BinaryType import org.apache.auron.sparkver @@ -55,15 +51,6 @@ case class NativeAggExec( @sparkver("3.3 / 3.4 / 3.5") override val initialInputBufferOffset: Int = theInitialInputBufferOffset - override def output: Seq[Attribute] = - if (aggregateExpressions.map(_.mode).contains(Final)) { - groupingExpressions.map(_.toAttribute) ++ aggregateAttributes - } else { - groupingExpressions.map(_.toAttribute) :+ - AttributeReference(NativeAggBase.AGG_BUF_COLUMN_NAME, BinaryType, nullable = false)( - ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID)) - } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") override def isStreaming: Boolean = false diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 4f124bd8..de0b6105 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -461,7 +461,7 @@ object AuronConverters extends Logging { assert( !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier - .getOrElse("unknown")}. " + + .getOrElse("unknown")}. " + "Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " + "or remove timestamp columns from the query.") } @@ -472,7 +472,7 @@ object AuronConverters extends Logging { assert( !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier - .getOrElse("unknown")}. " + + .getOrElse("unknown")}. " + "Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " + "or remove timestamp columns from the query.") } @@ -480,7 +480,7 @@ object AuronConverters extends Logging { case p => throw new NotImplementedError( s"Cannot convert FileSourceScanExec tableIdentifier: ${tableIdentifier.getOrElse( - "unknown")}, class: ${p.getClass.getName}") + "unknown")}, class: ${p.getClass.getName}") } } @@ -836,8 +836,7 @@ object AuronConverters extends Logging { addRenameColumnsExec(convertToNative(exec.child)) case _ => if (needRenameColumns(exec.child)) { - val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) :+ - NativeAggBase.AGG_BUF_COLUMN_NAME + val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) Shims.get.createNativeRenameColumnsExec(convertToNative(exec.child), newNames) } else { convertToNative(exec.child) @@ -893,8 +892,7 @@ object AuronConverters extends Logging { addRenameColumnsExec(convertToNative(exec.child)) case _ => if (needRenameColumns(exec.child)) { - val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) :+ - NativeAggBase.AGG_BUF_COLUMN_NAME + val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) Shims.get.createNativeRenameColumnsExec(convertToNative(exec.child), newNames) } else { convertToNative(exec.child) @@ -947,8 +945,7 @@ object AuronConverters extends Logging { addRenameColumnsExec(convertToNative(child)) case _ => if (needRenameColumns(child)) { - val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) :+ - NativeAggBase.AGG_BUF_COLUMN_NAME + val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) Shims.get.createNativeRenameColumnsExec(convertToNative(child), newNames) } else { convertToNative(child) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index 1f8b6421..9944ea5b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -96,7 +96,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu dumpSimpleSparkPlanTreeNode(sparkPlanTransformed) logInfo(s"Transformed spark plan after preColumnarTransitions:\n${sparkPlanTransformed - .treeString(verbose = true, addSuffix = true)}") + .treeString(verbose = true, addSuffix = true)}") // post-transform Shims.get.postTransform(sparkPlanTransformed, sparkSession.sparkContext) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala index 3ae7669e..403008e6 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala @@ -74,7 +74,7 @@ object NativeHelper extends Logging { val heapMemory = Runtime.getRuntime.maxMemory() val offheapMemory = totalMemory - heapMemory logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString( - heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") + heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") offheapMemory } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala index 8e5d7353..eecec5ef 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala @@ -47,7 +47,7 @@ object TaskContextHelper extends Logging { val thread = Thread.currentThread() val threadName = if (context != null) { s"auron native task ${context.partitionId()}.${context.attemptNumber()} in stage ${context - .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" + .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" } else { "auron native task " + thread.getName } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala index 54e2cddc..f595dacc 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala @@ -52,8 +52,7 @@ import org.apache.spark.sql.execution.auron.plan.NativeAggBase.SortAgg import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.BinaryType -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, LongType} import org.apache.auron.{protobuf => pb} import org.apache.auron.jni.AuronAdaptor @@ -146,9 +145,10 @@ abstract class NativeAggBase( if (nativeAggrModes.contains(pb.AggMode.FINAL)) { groupingExpressions.map(_.toAttribute) ++ aggregateAttributes } else { - groupingExpressions.map(_.toAttribute) :+ - AttributeReference(NativeAggBase.AGG_BUF_COLUMN_NAME, BinaryType, nullable = false)( - ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID)) + val aggBufferAttrs = nativeAggrInfos + .flatMap(_.aggBufferDataTypes) + .map(AttributeReference("", _, nullable = false)(ExprId.apply(0))) + groupingExpressions.map(_.toAttribute) ++ aggBufferAttrs } override def outputPartitioning: Partitioning = @@ -209,38 +209,18 @@ abstract class NativeAggBase( } object NativeAggBase extends Logging { - - val AGG_BUF_COLUMN_EXPR_ID = 9223372036854775807L - val AGG_BUF_COLUMN_NAME = s"#$AGG_BUF_COLUMN_EXPR_ID" - trait AggExecMode; case object HashAgg extends AggExecMode case object SortAgg extends AggExecMode - case class NativeAggrPartialState(stateAttr: Attribute, arrowType: pb.ArrowType) - - object NativeAggrPartialState { - def apply( - aggrAttr: Attribute, - stateFieldName: String, - dataType: DataType, - nullable: Boolean, - arrowType: pb.ArrowType = null): NativeAggrPartialState = { - - val fieldName = s"${Util.getFieldNameByExprId(aggrAttr)}[$stateFieldName]" - val stateAttr = AttributeReference(fieldName, dataType, nullable)(aggrAttr.exprId) - NativeAggrPartialState( - stateAttr, - arrowType = Option(arrowType).getOrElse(NativeConverters.convertDataType(dataType))) - } - } - case class NativeAggrInfo( mode: AggregateMode, nativeAggrs: Seq[pb.PhysicalExprNode], + aggBufferDataTypes: Seq[DataType], outputAttr: Attribute) def getNativeAggrInfo(aggr: AggregateExpression, aggrAttr: Attribute): NativeAggrInfo = { + val aggBufferDataTypes = computeNativeAggBufferDataTypes(aggr.aggregateFunction) val reducedAggr = AggregateExpression( aggr.aggregateFunction .mapChildren(e => createPlaceholder(e)) @@ -253,12 +233,17 @@ object NativeAggBase extends Logging { aggr.mode match { case Partial => - NativeAggrInfo(aggr.mode, NativeConverters.convertAggregateExpr(aggr) :: Nil, outputAttr) + NativeAggrInfo( + aggr.mode, + NativeConverters.convertAggregateExpr(aggr) :: Nil, + aggBufferDataTypes, + outputAttr) case PartialMerge | Final => NativeAggrInfo( aggr.mode, NativeConverters.convertAggregateExpr(reducedAggr) :: Nil, + aggBufferDataTypes, outputAttr) case Complete => @@ -310,4 +295,17 @@ object NativeAggBase extends Logging { } findRecursive(exec.children.head) } + + def computeNativeAggBufferDataTypes(aggr: AggregateFunction): Seq[DataType] = { + aggr match { + case _: Count => Seq(LongType) + case f: Max => Seq(f.dataType) + case f: Min => Seq(f.dataType) + case f: Sum => Seq(f.dataType) + case f: Average => Seq(f.dataType, LongType) + case f @ First(_, true) => Seq(f.dataType) + case f @ First(_, false) => Seq(f.dataType, BooleanType) + case _ => Seq(BinaryType) + } + } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala index d43f7d17..b9fb7f99 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -69,9 +69,10 @@ abstract class NativeParquetInsertIntoHiveTableBase( .filterKeys(Set("stage_id", "output_rows", "elapsed_compute")) .toSeq :+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) - :+ ("bytes_written", - SQLMetrics - .createSizeMetric(sparkContext, "Native.bytes_written")): _*) + :+ ( + "bytes_written", + SQLMetrics + .createSizeMetric(sparkContext, "Native.bytes_written")): _*) def check(): Unit = { val hadoopConf = sparkContext.hadoopConfiguration
