This is an automated email from the ASF dual-hosted git repository. richox pushed a commit to branch dev-v6.0.0-parallel-scan-kdev-build in repository https://gitbox.apache.org/repos/asf/auron.git
commit 3fd3ba873251eed3125aa5569714075f8acfa92b Author: zhangli20 <[email protected]> AuthorDate: Wed Jan 21 17:48:44 2026 +0800 Implement columnar aggregate buffers --- native-engine/datafusion-ext-plans/src/agg/acc.rs | 210 +++++++++------------ native-engine/datafusion-ext-plans/src/agg/agg.rs | 1 + .../datafusion-ext-plans/src/agg/agg_ctx.rs | 84 +++++---- .../datafusion-ext-plans/src/agg/agg_table.rs | 12 +- native-engine/datafusion-ext-plans/src/agg/avg.rs | 23 ++- .../datafusion-ext-plans/src/agg/bloom_filter.rs | 43 +++-- .../src/agg/brickhouse/collect.rs | 4 + .../src/agg/brickhouse/combine_unique.rs | 4 + .../datafusion-ext-plans/src/agg/collect.rs | 40 +++- .../datafusion-ext-plans/src/agg/count.rs | 143 +++----------- .../datafusion-ext-plans/src/agg/first.rs | 36 ++-- .../src/agg/first_ignores_null.rs | 18 +- .../datafusion-ext-plans/src/agg/maxmin.rs | 13 +- .../src/agg/spark_udaf_wrapper.rs | 51 +++-- native-engine/datafusion-ext-plans/src/agg/sum.rs | 20 +- .../sql/execution/blaze/plan/NativeAggExec.scala | 18 +- .../spark/sql/auron/util/TaskContextHelper.scala | 135 +++++++++++++ .../apache/spark/sql/blaze/BlazeConverters.scala | 11 +- .../sql/blaze/BlazeSparkSessionExtension.scala | 2 +- .../org/apache/spark/sql/blaze/NativeHelper.scala | 3 +- .../sql/execution/blaze/plan/NativeAggBase.scala | 53 +++--- 21 files changed, 512 insertions(+), 412 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs b/native-engine/datafusion-ext-plans/src/agg/acc.rs index 7d8530a8..fe363e05 100644 --- a/native-engine/datafusion-ext-plans/src/agg/acc.rs +++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs @@ -14,7 +14,7 @@ use std::{ any::Any, - io::{Cursor, Read, Write}, + io::{Read, Write}, sync::Arc, }; @@ -23,7 +23,7 @@ use arrow::{ datatypes::{DataType, *}, }; use bitvec::{bitvec, vec::BitVec}; -use byteorder::{ReadBytesExt, WriteBytesExt}; +use byteorder::WriteBytesExt; use datafusion::common::{Result, ScalarValue, utils::proxy::VecAllocExt}; use datafusion_ext_commons::{ SliceAsRawBytes, UninitializedInit, df_execution_err, downcast_any, @@ -45,9 +45,9 @@ pub trait AccColumn: Send { fn shrink_to_fit(&mut self); fn num_records(&self) -> usize; fn mem_used(&self) -> usize; - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()>; - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()>; - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()>; + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>>; + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()>; + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()>; fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()>; fn ensure_size(&mut self, idx: IdxSelection<'_>) { @@ -139,8 +139,7 @@ impl AccBooleanColumn { } } - pub fn to_array(&self, dt: &DataType, idx: IdxSelection<'_>) -> Result<ArrayRef> { - assert!(dt == &DataType::Boolean); + pub fn to_array(&self, idx: IdxSelection<'_>) -> Result<ArrayRef> { idx_with_iter!((idx @ idx) => { Ok(Arc::new(BooleanArray::from_iter( idx.map(|i| self.valids[i].then_some(self.values[i])) @@ -176,38 +175,32 @@ impl AccColumn for AccBooleanColumn { self.num_records() / 4 // 2 bits for each value } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - idx_with_iter!((idx @ idx) => { - for (i, w) in idx.zip(array) { - if self.valids[i] { - w.write_u8(1 + self.values[i] as u8)?; - } else { - w.write_u8(0)?; - } - } - }); - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.to_array(idx)?; + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.resize(0); + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = downcast_any!(&arrays[0], BooleanArray)?; + self.values.clear(); + self.valids.clear(); - for cursor in cursors { - match cursor.read_u8()? { - 0 => { - self.valids.push(false); - self.values.push(false); - } - v => { - self.valids.push(true); - self.values.push(v - 1 != 0); - } + // fill values + for i in 0..array.len() { + self.values.push(array.value(i)); + } + + // fill valids + if let Some(nb) = array.nulls() { + for bit in nb { + self.valids.push(bit); } } + self.valids.resize(self.values.len(), true); Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { let mut buf = vec![]; idx_for! { @@ -242,13 +235,15 @@ impl AccColumn for AccBooleanColumn { pub struct AccPrimColumn<T: ArrowNativeType> { values: Vec<T>, valids: BitVec, + dt: DataType, } impl<T: ArrowNativeType> AccPrimColumn<T> { - pub fn new(num_records: usize) -> Self { + pub fn new(num_records: usize, dt: DataType) -> Self { Self { values: vec![T::default(); num_records], valids: bitvec![0; num_records], + dt, } } @@ -331,39 +326,30 @@ impl<T: ArrowNativeType> AccColumn for AccPrimColumn<T> { self.values.allocated_size() + (self.valids.capacity() + 7) / 8 } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - idx_with_iter!((idx @ idx) => { - for (i, w) in idx.zip(array) { - if self.valids[i] { - w.write_u8(1)?; - w.write_all([self.values[i]].as_raw_bytes())?; - } else { - w.write_u8(0)?; - } - } - }); - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.to_array(&self.dt, idx)?; + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array_data = arrays[0].to_data(); self.resize(0); - let mut value_buf = [T::default()]; - - for cursor in cursors { - let valid = cursor.read_u8()?; - if valid == 1 { - cursor.read_exact(value_buf.as_raw_bytes_mut())?; - self.values.push(value_buf[0]); - self.valids.push(true); - } else { - self.values.push(T::default()); - self.valids.push(false); + + // fill values + let buffer = array_data.buffer::<T>(0); + self.values = buffer[array_data.offset()..][..array_data.len()].to_vec(); + + // fill valids + if let Some(nb) = array_data.nulls() { + for bit in nb { + self.valids.push(bit); } } + self.valids.resize(self.values.len(), true); Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { // write valids let mut bits: BitVec<u8> = BitVec::with_capacity(idx.len()); idx_for! { @@ -413,13 +399,15 @@ impl<T: ArrowNativeType> AccColumn for AccPrimColumn<T> { pub struct AccBytesColumn { items: Vec<Option<AccBytes>>, heap_mem_used: usize, + dt: DataType, } impl AccBytesColumn { - pub fn new(num_records: usize) -> Self { + pub fn new(num_records: usize, dt: DataType) -> Self { Self { items: vec![None; num_records], heap_mem_used: 0, + dt, } } @@ -438,13 +426,13 @@ impl AccBytesColumn { self.heap_mem_used += self.item_heap_mem_used(idx); } - fn to_array(&self, dt: &DataType, idx: IdxSelection<'_>) -> Result<ArrayRef> { + fn to_array(&self, idx: IdxSelection<'_>) -> Result<ArrayRef> { let binary; idx_with_iter!((idx @ idx) => { binary = BinaryArray::from_iter(idx.map(|i| self.items[i].as_ref())); }); - match dt { + match &self.dt { DataType::Utf8 => Ok(make_array( binary .to_data() @@ -453,7 +441,7 @@ impl AccBytesColumn { .build()?, )), DataType::Binary => Ok(Arc::new(binary)), - _ => df_execution_err!("expected string or binary type, got {dt:?}"), + dt => df_execution_err!("expected string or binary type, got {dt:?}"), } } @@ -534,25 +522,34 @@ impl AccColumn for AccBytesColumn { self.heap_mem_used + self.items.allocated_size() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - idx_with_iter!((idx @ idx) => { - for (i, w) in idx.zip(array) { - self.save_value(i, w)?; - } - }); - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.to_array(idx)?; + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.items.resize(0, Default::default()); - for cursor in cursors { - self.load_value(cursor)?; + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array_data = arrays[0].to_data(); + self.items.clear(); + + // fill values + let data_buffer = array_data.buffer::<u8>(0); + let offset_buffer = array_data.buffer::<i32>(1); + for i in 0..array_data.len() { + if array_data.is_valid(i) { + let offset_begin = offset_buffer[array_data.offset() + i] as usize; + let offset_end = offset_buffer[array_data.offset() + i + 1] as usize; + self.items.push(Some(AccBytes::from_slice( + &data_buffer[offset_begin..offset_end], + ))); + } else { + self.items.push(None); + } } self.refresh_heap_mem_used(); Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { self.save_value(idx, w)?; @@ -578,17 +575,17 @@ pub struct AccScalarValueColumn { } impl AccScalarValueColumn { - pub fn new(dt: &DataType, num_rows: usize) -> Self { - let null_value = ScalarValue::try_from(dt).expect("unsupported data type"); + pub fn new(dt: DataType, num_rows: usize) -> Self { + let null_value = ScalarValue::try_from(&dt).expect("unsupported data type"); Self { items: (0..num_rows).map(|_| null_value.clone()).collect(), - dt: dt.clone(), + dt, null_value, heap_mem_used: 0, } } - pub fn to_array(&mut self, _dt: &DataType, idx: IdxSelection<'_>) -> Result<ArrayRef> { + pub fn to_array(&mut self, idx: IdxSelection<'_>) -> Result<ArrayRef> { idx_with_iter!((idx @ idx) => { ScalarValue::iter_to_array(idx.map(|i| { std::mem::replace(&mut self.items[i], self.null_value.clone()) @@ -644,38 +641,36 @@ impl AccColumn for AccScalarValueColumn { self.heap_mem_used + self.items.allocated_size() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - idx_with_iter!((idx @ idx) => { - for (i, w) in idx.zip(array) { - write_scalar(&self.items[i], true, w)?; - } - }); - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.to_array(idx)?; // data type is not used + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.items.truncate(0); + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = &arrays[0]; + + self.items.clear(); self.heap_mem_used = 0; - for cursor in cursors { - let scalar = read_scalar(cursor, &self.dt, true)?; + for i in 0..array.len() { + let scalar = ScalarValue::try_from_array(array, i)?; self.heap_mem_used += scalar_value_heap_mem_size(&scalar); - self.items.push(scalar); } Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { - write_scalar(&self.items[idx], true, w)?; + let scalar = self.take_value(idx); + write_scalar(&scalar, true, w)?; } } Ok(()) } fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - self.items.truncate(0); + self.items.clear(); self.heap_mem_used = 0; for _ in 0..num_rows { @@ -687,43 +682,18 @@ impl AccColumn for AccScalarValueColumn { } } -pub fn create_acc_generic_column(dt: &DataType, num_rows: usize) -> AccColumnRef { +pub fn create_acc_generic_column(dt: DataType, num_rows: usize) -> AccColumnRef { macro_rules! primitive_helper { ($t:ty) => { Box::new(AccPrimColumn::<<$t as ArrowPrimitiveType>::Native>::new( - num_rows, + num_rows, dt, )) }; } downcast_primitive! { dt => (primitive_helper), DataType::Boolean => Box::new(AccBooleanColumn::new(num_rows)), - DataType::Utf8 | DataType::Binary => Box::new(AccBytesColumn::new(num_rows)), + DataType::Utf8 | DataType::Binary => Box::new(AccBytesColumn::new(num_rows, dt)), other => Box::new(AccScalarValueColumn::new(other, num_rows)), } } - -pub fn acc_generic_column_to_array( - column: &mut AccColumnRef, - dt: &DataType, - idx: IdxSelection<'_>, -) -> Result<ArrayRef> { - macro_rules! primitive_helper { - ($t:ty) => { - downcast_any!(column, mut AccPrimColumn::<<$t as ArrowPrimitiveType>::Native>)? - .to_array(dt, idx) - }; - } - downcast_primitive! { - dt => (primitive_helper), - DataType::Boolean => { - downcast_any!(column, mut AccBooleanColumn)?.to_array(dt, idx) - } - DataType::Utf8 | DataType::Binary => { - downcast_any!(column, mut AccBytesColumn)?.to_array(dt, idx) - } - _other => { - downcast_any!(column, mut AccScalarValueColumn)?.to_array(dt, idx) - } - } -} diff --git a/native-engine/datafusion-ext-plans/src/agg/agg.rs b/native-engine/datafusion-ext-plans/src/agg/agg.rs index 1e906143..82ab45d0 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg.rs @@ -43,6 +43,7 @@ pub trait Agg: Send + Sync + Debug { fn data_type(&self) -> &DataType; fn nullable(&self) -> bool; fn create_acc_column(&self, num_rows: usize) -> AccColumnRef; + fn acc_array_data_types(&self) -> &[DataType]; fn with_new_exprs(&self, exprs: Vec<PhysicalExprRef>) -> Result<Arc<dyn Agg>>; fn prepare_partial_args(&self, partial_inputs: &[ArrayRef]) -> Result<Vec<ArrayRef>> { diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs b/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs index 1efa7cd3..c34fe0eb 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs @@ -14,13 +14,12 @@ use std::{ fmt::{Debug, Formatter}, - io::Cursor, sync::Arc, }; use arrow::{ - array::{ArrayRef, BinaryArray, RecordBatchOptions}, - datatypes::{DataType, Field, Fields, Schema, SchemaRef}, + array::{Array, ArrayRef, RecordBatchOptions}, + datatypes::{Field, Fields, Schema, SchemaRef}, record_batch::RecordBatch, row::{RowConverter, Rows, SortField}, }; @@ -28,20 +27,16 @@ use blaze_jni_bridge::{ conf, conf::{BooleanConf, DoubleConf, IntConf}, }; -use datafusion::{ - common::{Result, cast::as_binary_array}, - physical_expr::PhysicalExprRef, -}; +use datafusion::{common::Result, physical_expr::PhysicalExprRef}; use datafusion_ext_commons::{downcast_any, suggested_batch_mem_size}; use once_cell::sync::OnceCell; use parking_lot::Mutex; use crate::{ agg::{ - AGG_BUF_COLUMN_NAME, AggExecMode, AggExpr, AggMode, GroupingExpr, + AggExecMode, AggExpr, AggMode, GroupingExpr, acc::AccTable, agg::{Agg, IdxSelection}, - agg_hash_map::AggHashMapKey, spark_udaf_wrapper::{AccUDAFBufferRowsColumn, SparkUDAFMemTracker, SparkUDAFWrapper}, }, common::{ @@ -62,6 +57,8 @@ pub struct AggContext { pub grouping_row_converter: Arc<Mutex<RowConverter>>, pub groupings: Vec<GroupingExpr>, pub aggs: Vec<AggExpr>, + pub input_acc_arrays_len: usize, + pub output_acc_arrays_len: usize, pub supports_partial_skipping: bool, pub partial_skipping_ratio: f64, pub partial_skipping_min_rows: usize, @@ -136,7 +133,11 @@ impl AggContext { )); } } else { - agg_fields.push(Field::new(AGG_BUF_COLUMN_NAME, DataType::Binary, false)); + for agg in &aggs { + for dt in agg.agg.acc_array_data_types() { + agg_fields.push(Field::new("", dt.clone(), true)); + } + } } let agg_schema = Arc::new(Schema::new(agg_fields)); let output_schema = Arc::new(Schema::new( @@ -147,6 +148,17 @@ impl AggContext { .concat(), )); + let input_acc_arrays_len = aggs + .iter() + .filter(|agg| agg.mode.is_partial_merge() || agg.mode.is_final()) + .map(|agg| agg.agg.acc_array_data_types().len()) + .sum(); + let output_acc_arrays_len = aggs + .iter() + .filter(|agg| agg.mode.is_partial() || agg.mode.is_partial_merge()) + .map(|agg| agg.agg.acc_array_data_types().len()) + .sum(); + let agg_exprs_flatten: Vec<PhysicalExprRef> = aggs .iter() .filter(|agg| agg.mode.is_partial()) @@ -194,6 +206,8 @@ impl AggContext { grouping_row_converter, groupings, aggs, + input_acc_arrays_len, + output_acc_arrays_len, agg_expr_evaluator, supports_partial_skipping, partial_skipping_ratio, @@ -276,21 +290,15 @@ impl AggContext { let mut merging_acc_table = self.create_acc_table(0); if self.need_partial_merge { - let partial_merged_array = as_binary_array(batch.columns().last().unwrap())?; - let array = partial_merged_array - .iter() - .skip(batch_start_idx) - .take(batch_end_idx - batch_start_idx) - .map(|bytes| bytes.unwrap()) - .collect::<Vec<_>>(); - let mut cursors = array - .iter() - .map(|bytes| Cursor::new(bytes.as_bytes())) - .collect::<Vec<_>>(); - - for (agg_idx, _agg) in &self.need_partial_merge_aggs { - let acc_col = &mut merging_acc_table.cols_mut()[*agg_idx]; - acc_col.unfreeze_from_rows(&mut cursors)?; + let mut acc_arrays_start = batch.num_columns() - self.input_acc_arrays_len; + for (agg_idx, agg) in &self.need_partial_merge_aggs { + let acc_arrays = &batch.columns()[acc_arrays_start..] + .iter() + .take(agg.acc_array_data_types().len()) + .map(|array| array.slice(batch_start_idx, batch_end_idx - batch_start_idx)) + .collect::<Vec<_>>(); + acc_arrays_start += agg.acc_array_data_types().len(); + merging_acc_table.cols_mut()[*agg_idx].unfreeze_from_arrays(&acc_arrays)?; } } let batch_selection = IdxSelection::Range(0, batch_end_idx - batch_start_idx); @@ -318,9 +326,7 @@ impl AggContext { } Ok(agg_columns) } else { - // output acc as a binary column - let freezed = self.freeze_acc_table(acc_table, idx)?; - Ok(vec![Arc::new(BinaryArray::from_iter_values(freezed))]) + self.freeze_acc_table(acc_table, idx) } } @@ -405,23 +411,23 @@ impl AggContext { pub fn freeze_acc_table( &self, - acc_table: &AccTable, + acc_table: &mut AccTable, acc_idx: IdxSelection, - ) -> Result<Vec<Vec<u8>>> { + ) -> Result<Vec<ArrayRef>> { let udaf_indices_cache = OnceCell::new(); - let mut vec = vec![vec![]; acc_idx.len()]; - for acc_col in acc_table.cols() { + let mut arrays = vec![]; + + for acc_col in acc_table.cols_mut() { if let Ok(udaf_acc_col) = downcast_any!(acc_col, AccUDAFBufferRowsColumn) { - udaf_acc_col.freeze_to_rows_with_indices_cache( - acc_idx, - &mut vec, - &udaf_indices_cache, - )?; + arrays.push( + udaf_acc_col + .freeze_to_array_with_indices_cache(acc_idx, &udaf_indices_cache)?, + ); } else { - acc_col.freeze_to_rows(acc_idx, &mut vec)?; + arrays.extend(acc_col.freeze_to_arrays(acc_idx)?); } } - Ok(vec) + Ok(arrays) } pub async fn process_partial_skipped( diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs index 3eeaf651..861f94b1 100644 --- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs +++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs @@ -543,7 +543,7 @@ impl HashingData { // sort all records using radix sort on hashcodes of keys let num_spill_buckets = self.agg_ctx.num_spill_buckets(self.mem_used()); let key_rows = self.map.into_keys(); - let acc_table = self.acc_table; + let mut acc_table = self.acc_table; let mut entries = key_rows .iter() .enumerate() @@ -567,7 +567,7 @@ impl HashingData { write_spill_bucket( &mut writer, &self.agg_ctx, - &acc_table, + &mut acc_table, entries[offset..][..cur_bucket_count] .iter() .map(|&(_, record_idx)| &key_rows[record_idx as usize]), @@ -673,7 +673,7 @@ impl MergingData { entries.shrink_to_fit(); let key_rows = self.key_rows; - let acc_table = self.acc_table; + let mut acc_table = self.acc_table; let mut bucket_counts = vec![0; num_spill_buckets]; radix_sort_by_key(&mut entries, &mut bucket_counts, |(bucket_id, ..)| { @@ -692,7 +692,7 @@ impl MergingData { write_spill_bucket( &mut writer, &self.agg_ctx, - &acc_table, + &mut acc_table, entries[offset..][..cur_bucket_count].iter().map( |&(_, batch_idx, row_idx, _)| { key_rows[batch_idx as usize] @@ -721,7 +721,7 @@ impl MergingData { fn write_spill_bucket( w: &mut SpillCompressedWriter, agg_ctx: &AggContext, - acc_table: &AccTable, + acc_table: &mut AccTable, key_iter: impl Iterator<Item = impl AsRef<[u8]>>, acc_idx_iter: impl Iterator<Item = usize>, spill_idx: usize, @@ -729,7 +729,7 @@ fn write_spill_bucket( // write accs let udaf_indices_cache = OnceCell::new(); let acc_indices: Vec<usize> = acc_idx_iter.collect(); - for col in acc_table.cols() { + for col in acc_table.cols_mut() { if let Ok(udaf_col) = downcast_any!(col, AccUDAFBufferRowsColumn) { udaf_col.spill_with_indices_cache( IdxSelection::Indices(&acc_indices), diff --git a/native-engine/datafusion-ext-plans/src/agg/avg.rs b/native-engine/datafusion-ext-plans/src/agg/avg.rs index 1298737b..fb040468 100644 --- a/native-engine/datafusion-ext-plans/src/agg/avg.rs +++ b/native-engine/datafusion-ext-plans/src/agg/avg.rs @@ -15,7 +15,6 @@ use std::{ any::Any, fmt::{Debug, Formatter}, - io::Cursor, sync::Arc, }; @@ -45,6 +44,7 @@ pub struct AggAvg { data_type: DataType, agg_sum: AggSum, agg_count: AggCount, + acc_array_data_types: Vec<DataType>, } impl AggAvg { @@ -52,6 +52,7 @@ impl AggAvg { let agg_sum = AggSum::try_new(child.clone(), data_type.clone())?; let agg_count = AggCount::try_new(vec![child.clone()], DataType::Int64)?; Ok(Self { + acc_array_data_types: vec![data_type.clone(), DataType::Int64], child, data_type, agg_sum, @@ -105,6 +106,10 @@ impl Agg for AggAvg { }) } + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types + } + fn partial_update( &self, accs: &mut AccColumnRef, @@ -204,19 +209,19 @@ impl AccColumn for AccAvgColumn { self.sum.mem_used() + self.count.mem_used() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - self.sum.freeze_to_rows(idx, array)?; - self.count.freeze_to_rows(idx, array)?; - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let sum_array = self.sum.freeze_to_arrays(idx)?[0].clone(); + let count_array = self.count.freeze_to_arrays(idx)?[0].clone(); + Ok(vec![sum_array, count_array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.sum.unfreeze_from_rows(cursors)?; - self.count.unfreeze_from_rows(cursors)?; + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + self.sum.unfreeze_from_arrays(&arrays[0..1])?; + self.count.unfreeze_from_arrays(&arrays[1..2])?; Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, buf: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, buf: &mut SpillCompressedWriter) -> Result<()> { self.sum.spill(idx, buf)?; self.count.spill(idx, buf)?; Ok(()) diff --git a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs index 5e872970..f061ff60 100644 --- a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs +++ b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs @@ -20,7 +20,7 @@ use std::{ }; use arrow::{ - array::{ArrayRef, AsArray, BinaryBuilder}, + array::{ArrayRef, AsArray, BinaryArray, BinaryBuilder}, datatypes::{DataType, Int64Type}, }; use byteorder::{ReadBytesExt, WriteBytesExt}; @@ -107,6 +107,10 @@ impl Agg for AggBloomFilter { bloom_filters } + fn acc_array_data_types(&self) -> &[DataType] { + &[DataType::Binary] + } + fn partial_update( &self, accs: &mut AccColumnRef, @@ -244,36 +248,41 @@ impl AccColumn for AccBloomFilterColumn { .sum() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let mut array = vec![]; + idx_for! { (idx in idx) => { - let w = &mut array[idx]; if let Some(bloom_filter) = &self.bloom_filters[idx] { + let mut w = vec![]; w.write_u8(1)?; - bloom_filter.write_to(w)?; + bloom_filter.write_to(&mut w)?; + array.push(Some(w)); } else { - w.write_u8(0)?; + array.push(None); } } } - Ok(()) + + let array = Arc::new(BinaryArray::from_iter(array)); + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - assert_eq!(self.num_records(), 0, "expect empty AccColumn"); - for r in cursors { - self.bloom_filters.push({ - if r.read_u8()? == 1 { - Some(SparkBloomFilter::read_from(r)?) - } else { - None - } - }); + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = downcast_any!(arrays[0], BinaryArray)?; + + for v in array { + if let Some(w) = v { + self.bloom_filters + .push(Some(SparkBloomFilter::read_from(&mut Cursor::new(w))?)); + } else { + self.bloom_filters.push(None); + } } Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { if let Some(bloom_filter) = &self.bloom_filters[idx] { diff --git a/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs b/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs index c9fd88d8..db9da487 100644 --- a/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs @@ -80,6 +80,10 @@ impl Agg for AggCollect { self.innert_collect_list.create_acc_column(num_rows) } + fn acc_array_data_types(&self) -> &[DataType] { + self.innert_collect_list.acc_array_data_types() + } + fn partial_update( &self, accs: &mut AccColumnRef, diff --git a/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs b/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs index 67f3b6ca..ed993504 100644 --- a/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs +++ b/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs @@ -80,6 +80,10 @@ impl Agg for AggCombineUnique { self.inner_collect_set.create_acc_column(num_rows) } + fn acc_array_data_types(&self) -> &[DataType] { + self.inner_collect_set.acc_array_data_types() + } + fn partial_update( &self, accs: &mut AccColumnRef, diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs b/native-engine/datafusion-ext-plans/src/agg/collect.rs index 88fc7270..341aea9d 100644 --- a/native-engine/datafusion-ext-plans/src/agg/collect.rs +++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs @@ -115,6 +115,10 @@ impl<C: AccCollectionColumn> Agg for AggGenericCollect<C> { col } + fn acc_array_data_types(&self) -> &[DataType] { + &[DataType::Binary] + } + fn partial_update( &self, accs: &mut AccColumnRef, @@ -291,15 +295,23 @@ impl AccColumn for AccSetColumn { self.mem_used + self.set.capacity() * size_of::<AccSet>() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - AccCollectionColumn::freeze_to_rows(self, idx, array) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let mut array = vec![vec![]; idx.len()]; + AccCollectionColumn::freeze_to_rows(self, idx, &mut array)?; + Ok(vec![Arc::new(BinaryArray::from_iter_values(array))]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - AccCollectionColumn::unfreeze_from_rows(self, cursors) + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = downcast_any!(arrays[0], BinaryArray)?; + let mut cursors = vec![]; + + for i in 0..array.len() { + cursors.push(Cursor::new(array.value(i))); + } + AccCollectionColumn::unfreeze_from_rows(self, &mut cursors) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { self.save_raw(idx, w)?; @@ -403,15 +415,23 @@ impl AccColumn for AccListColumn { self.mem_used + self.list.capacity() * size_of::<AccList>() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - AccCollectionColumn::freeze_to_rows(self, idx, array) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let mut array = vec![vec![]; idx.len()]; + AccCollectionColumn::freeze_to_rows(self, idx, &mut array)?; + Ok(vec![Arc::new(BinaryArray::from_iter_values(array))]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - AccCollectionColumn::unfreeze_from_rows(self, cursors) + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + let array = downcast_any!(arrays[0], BinaryArray)?; + let mut cursors = vec![]; + + for i in 0..array.len() { + cursors.push(Cursor::new(array.value(i))); + } + AccCollectionColumn::unfreeze_from_rows(self, &mut cursors) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { idx_for! { (idx in idx) => { self.save_raw(idx, w)?; diff --git a/native-engine/datafusion-ext-plans/src/agg/count.rs b/native-engine/datafusion-ext-plans/src/agg/count.rs index 0ae34a76..61508c77 100644 --- a/native-engine/datafusion-ext-plans/src/agg/count.rs +++ b/native-engine/datafusion-ext-plans/src/agg/count.rs @@ -15,24 +15,19 @@ use std::{ any::Any, fmt::{Debug, Formatter}, - io::Cursor, sync::Arc, }; use arrow::{array::*, datatypes::*}; use datafusion::{common::Result, physical_expr::PhysicalExprRef}; -use datafusion_ext_commons::{ - downcast_any, - io::{read_len, write_len}, -}; +use datafusion_ext_commons::downcast_any; use crate::{ agg::{ - acc::{AccColumn, AccColumnRef}, + acc::{AccColumn, AccColumnRef, AccPrimColumn}, agg::{Agg, IdxSelection}, }, - idx_for, idx_for_zipped, idx_with_iter, - memmgr::spill::{SpillCompressedReader, SpillCompressedWriter}, + idx_for_zipped, }; pub struct AggCount { @@ -81,9 +76,11 @@ impl Agg for AggCount { } fn create_acc_column(&self, num_rows: usize) -> Box<dyn AccColumn> { - Box::new(AccCountColumn { - values: vec![0; num_rows], - }) + Box::new(AccPrimColumn::<i64>::new(num_rows, DataType::Int64)) + } + + fn acc_array_data_types(&self) -> &[DataType] { + &[DataType::Int64] } fn partial_update( @@ -93,32 +90,15 @@ impl Agg for AggCount { partial_args: &[ArrayRef], partial_arg_idx: IdxSelection<'_>, ) -> Result<()> { - let accs = downcast_any!(accs, mut AccCountColumn)?; + let accs = downcast_any!(accs, mut AccPrimColumn<i64>)?; accs.ensure_size(acc_idx); - if partial_args.is_empty() { - idx_for_zipped! { - ((acc_idx, _partial_arg_idx) in (acc_idx, partial_arg_idx)) => { - if acc_idx >= accs.values.len() { - accs.values.push(1); - } else { - accs.values[acc_idx] += 1; - } - } - } - } else { - idx_for_zipped! { - ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { - let add = partial_args - .iter() - .all(|arg| arg.is_valid(partial_arg_idx)) as i64; - - if acc_idx >= accs.values.len() { - accs.values.push(add); - } else { - accs.values[acc_idx] += add; - } - } + idx_for_zipped! { + ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => { + let add = partial_args + .iter() + .all(|arg| arg.is_valid(partial_arg_idx)) as i64; + accs.set_value(acc_idx, Some(accs.value(acc_idx).unwrap_or(0) + add)); } } Ok(()) @@ -131,98 +111,25 @@ impl Agg for AggCount { merging_accs: &mut AccColumnRef, merging_acc_idx: IdxSelection<'_>, ) -> Result<()> { - let accs = downcast_any!(accs, mut AccCountColumn)?; - let merging_accs = downcast_any!(merging_accs, mut AccCountColumn)?; + let accs = downcast_any!(accs, mut AccPrimColumn<i64>)?; + let merging_accs = downcast_any!(merging_accs, mut AccPrimColumn<i64>)?; accs.ensure_size(acc_idx); idx_for_zipped! { ((acc_idx, merging_acc_idx) in (acc_idx, merging_acc_idx)) => { - if acc_idx < accs.values.len() { - accs.values[acc_idx] += merging_accs.values[merging_acc_idx]; - } else { - accs.values.push(merging_accs.values[merging_acc_idx]); - } + let v = match (accs.value(acc_idx), merging_accs.value(merging_acc_idx)) { + (Some(a), Some(b)) => Some(a + b), + (Some(a), _) => Some(a), + (_, Some(b)) => Some(b), + _ => Some(0), + }; + accs.set_value(acc_idx, v); } } Ok(()) } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - let accs = downcast_any!(accs, mut AccCountColumn)?; - - idx_with_iter! { - (acc_idx_iter @ acc_idx) => { - Ok(Arc::new(Int64Array::from_iter_values( - acc_idx_iter.map(|idx| accs.values[idx]) - ))) - } - } - } -} - -pub struct AccCountColumn { - pub values: Vec<i64>, -} - -impl AccColumn for AccCountColumn { - fn as_any(&self) -> &dyn Any { - self - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - self - } - - fn resize(&mut self, num_accs: usize) { - self.values.resize(num_accs, 0); - } - - fn shrink_to_fit(&mut self) { - self.values.shrink_to_fit(); - } - - fn num_records(&self) -> usize { - self.values.len() - } - - fn mem_used(&self) -> usize { - self.values.capacity() * 2 * size_of::<i64>() - } - - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - let mut array_idx = 0; - - idx_for! { - (idx in idx) => { - write_len(self.values[idx] as usize, &mut array[array_idx])?; - array_idx += 1; - } - } - Ok(()) - } - - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - assert_eq!(self.num_records(), 0, "expect empty AccColumn"); - for cursor in cursors { - self.values.push(read_len(cursor)? as i64); - } - Ok(()) - } - - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { - idx_for! { - (idx in idx) => { - write_len(self.values[idx] as usize, w)?; - } - } - Ok(()) - } - - fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> Result<()> { - assert_eq!(self.num_records(), 0, "expect empty AccColumn"); - for _ in 0..num_rows { - self.values.push(read_len(r)? as i64); - } - Ok(()) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } diff --git a/native-engine/datafusion-ext-plans/src/agg/first.rs b/native-engine/datafusion-ext-plans/src/agg/first.rs index 66b81eea..001e7ad8 100644 --- a/native-engine/datafusion-ext-plans/src/agg/first.rs +++ b/native-engine/datafusion-ext-plans/src/agg/first.rs @@ -15,7 +15,6 @@ use std::{ any::Any, fmt::{Debug, Formatter}, - io::Cursor, sync::Arc, }; @@ -31,7 +30,7 @@ use crate::{ Agg, acc::{ AccBooleanColumn, AccBytes, AccBytesColumn, AccColumn, AccColumnRef, AccPrimColumn, - AccScalarValueColumn, acc_generic_column_to_array, create_acc_generic_column, + AccScalarValueColumn, create_acc_generic_column, }, agg::IdxSelection, }, @@ -42,11 +41,17 @@ use crate::{ pub struct AggFirst { child: PhysicalExprRef, data_type: DataType, + acc_array_data_types: Vec<DataType>, } impl AggFirst { pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> Result<Self> { - Ok(Self { child, data_type }) + let acc_array_data_types = vec![data_type.clone(), DataType::Boolean]; + Ok(Self { + child, + data_type, + acc_array_data_types, + }) } } @@ -82,11 +87,15 @@ impl Agg for AggFirst { fn create_acc_column(&self, num_rows: usize) -> AccColumnRef { Box::new(AccFirstColumn { - values: create_acc_generic_column(&self.data_type, num_rows), + values: create_acc_generic_column(self.data_type.clone(), num_rows), flags: AccBooleanColumn::new(num_rows), }) } + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types + } + fn partial_update( &self, accs: &mut AccColumnRef, @@ -266,8 +275,7 @@ impl Agg for AggFirst { } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - let accs = downcast_any!(accs, mut AccFirstColumn)?; - acc_generic_column_to_array(&mut accs.values, &self.data_type, acc_idx) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } @@ -311,19 +319,19 @@ impl AccColumn for AccFirstColumn { self.values.mem_used() + self.flags.mem_used() } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - self.values.freeze_to_rows(idx, array)?; - self.flags.freeze_to_rows(idx, array)?; - Ok(()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let value_array = self.values.freeze_to_arrays(idx)?[0].clone(); + let flags_array = self.flags.freeze_to_arrays(idx)?[0].clone(); + Ok(vec![value_array, flags_array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { - self.values.unfreeze_from_rows(cursors)?; - self.flags.unfreeze_from_rows(cursors)?; + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { + self.values.unfreeze_from_arrays(&arrays[0..1])?; + self.flags.unfreeze_from_arrays(&arrays[1..2])?; Ok(()) } - fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> Result<()> { self.values.spill(idx, w)?; self.flags.spill(idx, w)?; Ok(()) diff --git a/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs b/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs index cb1d5e71..e6966e21 100644 --- a/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs +++ b/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs @@ -27,7 +27,7 @@ use crate::{ Agg, acc::{ AccBooleanColumn, AccBytes, AccBytesColumn, AccColumnRef, AccPrimColumn, - AccScalarValueColumn, acc_generic_column_to_array, create_acc_generic_column, + AccScalarValueColumn, create_acc_generic_column, }, agg::IdxSelection, }, @@ -37,11 +37,17 @@ use crate::{ pub struct AggFirstIgnoresNull { child: PhysicalExprRef, data_type: DataType, + acc_array_data_types: Vec<DataType>, } impl AggFirstIgnoresNull { pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> Result<Self> { - Ok(Self { child, data_type }) + let acc_array_data_types = vec![data_type.clone()]; + Ok(Self { + child, + data_type, + acc_array_data_types, + }) } } @@ -76,7 +82,11 @@ impl Agg for AggFirstIgnoresNull { } fn create_acc_column(&self, num_rows: usize) -> AccColumnRef { - create_acc_generic_column(&self.data_type, num_rows) + create_acc_generic_column(self.data_type.clone(), num_rows) + } + + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types } fn partial_update( @@ -216,6 +226,6 @@ impl Agg for AggFirstIgnoresNull { } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - acc_generic_column_to_array(accs, &self.data_type, acc_idx) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } diff --git a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs index efe6c39a..ae9c5ab0 100644 --- a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs +++ b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs @@ -29,7 +29,7 @@ use crate::{ Agg, acc::{ AccBooleanColumn, AccBytes, AccBytesColumn, AccColumn, AccColumnRef, AccPrimColumn, - AccScalarValueColumn, acc_generic_column_to_array, create_acc_generic_column, + AccScalarValueColumn, create_acc_generic_column, }, agg::IdxSelection, }, @@ -42,14 +42,17 @@ pub type AggMin = AggMaxMin<AggMinParams>; pub struct AggMaxMin<P: AggMaxMinParams> { child: PhysicalExprRef, data_type: DataType, + acc_array_data_types: Vec<DataType>, _phantom: PhantomData<P>, } impl<P: AggMaxMinParams> AggMaxMin<P> { pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> Result<Self> { + let acc_array_data_types = vec![data_type.clone()]; Ok(Self { child, data_type, + acc_array_data_types, _phantom: Default::default(), }) } @@ -86,7 +89,11 @@ impl<P: AggMaxMinParams> Agg for AggMaxMin<P> { } fn create_acc_column(&self, num_rows: usize) -> AccColumnRef { - create_acc_generic_column(&self.data_type, num_rows) + create_acc_generic_column(self.data_type.clone(), num_rows) + } + + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types } fn partial_update( @@ -285,7 +292,7 @@ impl<P: AggMaxMinParams> Agg for AggMaxMin<P> { } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - acc_generic_column_to_array(accs, &self.data_type, acc_idx) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } diff --git a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs index 554cbff2..c891285f 100644 --- a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs +++ b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs @@ -20,7 +20,10 @@ use std::{ }; use arrow::{ - array::{Array, ArrayRef, StructArray, as_struct_array, make_array}, + array::{ + Array, ArrayAccessor, ArrayRef, BinaryArray, BinaryBuilder, StructArray, as_struct_array, + make_array, + }, datatypes::{DataType, Field, Schema, SchemaRef}, ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi}, record_batch::{RecordBatch, RecordBatchOptions}, @@ -237,6 +240,10 @@ impl Agg for SparkUDAFWrapper { Box::new(AccUDAFBufferRowsColumn { obj, jcontext }) } + fn acc_array_data_types(&self) -> &[DataType] { + &[DataType::Binary] + } + fn with_new_exprs(&self, _exprs: Vec<PhysicalExprRef>) -> Result<Arc<dyn Agg>> { Ok(Arc::new(Self::try_new( self.serialized.clone(), @@ -288,12 +295,11 @@ pub struct AccUDAFBufferRowsColumn { } impl AccUDAFBufferRowsColumn { - pub fn freeze_to_rows_with_indices_cache( + pub fn freeze_to_array_with_indices_cache( &self, idx: IdxSelection<'_>, - array: &mut [Vec<u8>], cache: &OnceCell<LocalRef>, - ) -> Result<()> { + ) -> Result<ArrayRef> { let idx_array = cache.get_or_try_init(move || jni_new_prim_array!(int, &idx.to_int32_vec()[..]))?; let serialized = jni_call!( @@ -306,15 +312,18 @@ impl AccUDAFBufferRowsColumn { jni_get_byte_array_region!(serialized.as_obj(), 0, &mut serialized_bytes[..])?; // UnsafeRow is serialized with big-endian i32 length prefix - let mut cursor = Cursor::new(&serialized_bytes); - for i in 0..array.len() { - let mut bytes_len_buf = [0; 4]; - cursor.read_exact(&mut bytes_len_buf)?; + let mut serialized_pos = 0; + let mut binary_builder = BinaryBuilder::with_capacity(idx.len(), 0); + for i in 0..idx.len() { + let mut bytes_len_buf = [0u8; 4]; + bytes_len_buf.copy_from_slice(&serialized_bytes[serialized_pos..][..4]); let bytes_len = i32::from_be_bytes(bytes_len_buf) as usize; - write_len(bytes_len, &mut array[i])?; - std::io::copy(&mut (&mut cursor).take(bytes_len as u64), &mut array[i])?; + serialized_pos += 4; + + binary_builder.append_value(&serialized_bytes[serialized_pos..][..bytes_len]); + serialized_pos += bytes_len; } - Ok(()) + Ok(Arc::new(binary_builder.finish())) } pub fn spill_with_indices_cache( @@ -388,15 +397,23 @@ impl AccColumn for AccUDAFBufferRowsColumn { 0 // memory is managed in jvm side } - fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> Result<()> { - self.freeze_to_rows_with_indices_cache(idx, array, &OnceCell::new()) + fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> Result<Vec<ArrayRef>> { + let array = self.freeze_to_array_with_indices_cache(idx, &OnceCell::new())?; + Ok(vec![array]) } - fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> Result<()> { + fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> { assert_eq!(self.num_records(), 0, "expect empty AccColumn"); + let array = downcast_any!(arrays[0], BinaryArray)?; + + let mut cursors = vec![]; + for i in 0..array.len() { + cursors.push(Cursor::new(array.value(i))); + } + let mut data = vec![]; - for cursor in cursors.iter_mut() { - let bytes_len = read_len(cursor)?; + for (i, cursor) in cursors.iter_mut().enumerate() { + let bytes_len = array.value(i).len(); data.write_all((bytes_len as i32).to_be_bytes().as_ref())?; std::io::copy(&mut cursor.take(bytes_len as u64), &mut data)?; } @@ -413,7 +430,7 @@ impl AccColumn for AccUDAFBufferRowsColumn { Ok(()) } - fn spill(&self, _idx: IdxSelection<'_>, _buf: &mut SpillCompressedWriter) -> Result<()> { + fn spill(&mut self, _idx: IdxSelection<'_>, _buf: &mut SpillCompressedWriter) -> Result<()> { unimplemented!("should call spill_with_indices_cache instead") } diff --git a/native-engine/datafusion-ext-plans/src/agg/sum.rs b/native-engine/datafusion-ext-plans/src/agg/sum.rs index 7e1470d4..f373eb4a 100644 --- a/native-engine/datafusion-ext-plans/src/agg/sum.rs +++ b/native-engine/datafusion-ext-plans/src/agg/sum.rs @@ -25,9 +25,7 @@ use datafusion_ext_commons::{df_unimplemented_err, downcast_any}; use crate::{ agg::{ Agg, - acc::{ - AccColumnRef, AccPrimColumn, acc_generic_column_to_array, create_acc_generic_column, - }, + acc::{AccColumnRef, AccPrimColumn, create_acc_generic_column}, agg::IdxSelection, }, idx_for_zipped, @@ -36,11 +34,17 @@ use crate::{ pub struct AggSum { child: PhysicalExprRef, data_type: DataType, + acc_array_data_types: Vec<DataType>, } impl AggSum { pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> Result<Self> { - Ok(Self { child, data_type }) + let acc_array_data_types = vec![data_type.clone()]; + Ok(Self { + child, + data_type, + acc_array_data_types, + }) } } @@ -83,7 +87,11 @@ impl Agg for AggSum { } fn create_acc_column(&self, num_rows: usize) -> AccColumnRef { - create_acc_generic_column(&self.data_type, num_rows) + create_acc_generic_column(self.data_type.clone(), num_rows) + } + + fn acc_array_data_types(&self) -> &[DataType] { + &self.acc_array_data_types } fn partial_update( @@ -144,6 +152,6 @@ impl Agg for AggSum { } fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) -> Result<ArrayRef> { - acc_generic_column_to_array(accs, &self.data_type, acc_idx) + Ok(accs.freeze_to_arrays(acc_idx)?[0].clone()) } } diff --git a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala index 51625490..3dc3c4ee 100644 --- a/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala +++ b/spark-extension-shims-spark3/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggExec.scala @@ -15,17 +15,18 @@ */ package org.apache.spark.sql.execution.blaze.plan -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.expressions.aggregate.Final +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.ExprId import org.apache.spark.sql.catalyst.expressions.NamedExpression -import org.apache.spark.sql.catalyst.expressions.aggregate.Final -import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.aggregate.BaseAggregateExec +import org.apache.spark.sql.execution.auron.plan.NativeAggBase.AggExecMode import org.apache.spark.sql.execution.blaze.plan.NativeAggBase.AggExecMode +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.BinaryType + import org.blaze.sparkver case class NativeAggExec( @@ -53,15 +54,6 @@ case class NativeAggExec( @sparkver("3.3 / 3.4 / 3.5") override val initialInputBufferOffset: Int = theInitialInputBufferOffset - override def output: Seq[Attribute] = - if (aggregateExpressions.map(_.mode).contains(Final)) { - groupingExpressions.map(_.toAttribute) ++ aggregateAttributes - } else { - groupingExpressions.map(_.toAttribute) :+ - AttributeReference(NativeAggBase.AGG_BUF_COLUMN_NAME, BinaryType, nullable = false)( - ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID)) - } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") override def isStreaming: Boolean = false diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala new file mode 100644 index 00000000..eecec5ef --- /dev/null +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.auron.util + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.APP_CALLER_CONTEXT +import org.apache.spark.util.Utils + +object TaskContextHelper extends Logging { + + private val callerContextSupported: Boolean = { + SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false) && { + try { + Utils.classForName("org.apache.hadoop.ipc.CallerContext") + Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder") + true + } catch { + case _: ClassNotFoundException => + false + case NonFatal(e) => + logWarning("Fail to load the CallerContext class", e) + false + } + } + } + + def setNativeThreadName(): Unit = { + val context: TaskContext = TaskContext.get() + val thread = Thread.currentThread() + val threadName = if (context != null) { + s"auron native task ${context.partitionId()}.${context.attemptNumber()} in stage ${context + .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" + } else { + "auron native task " + thread.getName + } + thread.setName(threadName) + } + + def setHDFSCallerContext(): Unit = { + if (!callerContextSupported) { + return + } + val context: TaskContext = TaskContext.get() + if (context != null) { + val conf = SparkEnv.get.conf + val appId = conf.get("spark.app.id", "") + val appAttemptId = conf.get("spark.app.attempt.id", "") + // Spark executor cannot get the jobId from TaskContext, so we set a default value -1 here. + val jobId = -1 + new CallerContextHelper( + "TASK", + conf.get(APP_CALLER_CONTEXT), + Option(appId), + if (appAttemptId == "") None else Option(appAttemptId), + Option(jobId), + Option(context.stageId()), + Option(context.stageAttemptNumber()), + Option(context.taskAttemptId()), + Option(context.attemptNumber())).setCurrentContext() + } + } + + /** + * Copied from Apache Spark org.apache.spark.util.CallerContext + */ + private class CallerContextHelper( + from: String, + upstreamCallerContext: Option[String] = None, + appId: Option[String] = None, + appAttemptId: Option[String] = None, + jobId: Option[Int] = None, + stageId: Option[Int] = None, + stageAttemptId: Option[Int] = None, + taskId: Option[Long] = None, + taskAttemptNumber: Option[Int] = None) + extends Logging { + + private val context = prepareContext( + "SPARK_" + + from + + appId.map("_" + _).getOrElse("") + + appAttemptId.map("_" + _).getOrElse("") + + jobId.map("_JId_" + _).getOrElse("") + + stageId.map("_SId_" + _).getOrElse("") + + stageAttemptId.map("_" + _).getOrElse("") + + taskId.map("_TId_" + _).getOrElse("") + + taskAttemptNumber.map("_" + _).getOrElse("") + + upstreamCallerContext.map("_" + _).getOrElse("")) + + private def prepareContext(context: String): String = { + lazy val len = SparkHadoopUtil.get.conf.getInt("hadoop.caller.context.max.size", 128) + if (context == null || context.length <= len) { + context + } else { + val finalContext = context.substring(0, len) + logWarning(s"Truncated Spark caller context from $context to $finalContext") + finalContext + } + } + + def setCurrentContext(): Unit = { + if (callerContextSupported) { + try { + val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") + val builder: Class[AnyRef] = + Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder") + val builderInst = builder.getConstructor(classOf[String]).newInstance(context) + val hdfsContext = builder.getMethod("build").invoke(builderInst) + callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext) + } catch { + case NonFatal(e) => + logWarning("Fail to set Spark caller context", e) + } + } + } + } +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala index c9af4188..d0ebc60c 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeConverters.scala @@ -369,7 +369,7 @@ object BlazeConverters extends Logging { case p => throw new NotImplementedError( s"Cannot convert FileSourceScanExec tableIdentifier: ${tableIdentifier.getOrElse( - "unknown")}, class: ${p.getClass.getName}") + "unknown")}, class: ${p.getClass.getName}") } } @@ -734,8 +734,7 @@ object BlazeConverters extends Logging { addRenameColumnsExec(convertToNative(exec.child)) case _ => if (needRenameColumns(exec.child)) { - val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) :+ - NativeAggBase.AGG_BUF_COLUMN_NAME + val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) Shims.get.createNativeRenameColumnsExec(convertToNative(exec.child), newNames) } else { convertToNative(exec.child) @@ -791,8 +790,7 @@ object BlazeConverters extends Logging { addRenameColumnsExec(convertToNative(exec.child)) case _ => if (needRenameColumns(exec.child)) { - val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) :+ - NativeAggBase.AGG_BUF_COLUMN_NAME + val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) Shims.get.createNativeRenameColumnsExec(convertToNative(exec.child), newNames) } else { convertToNative(exec.child) @@ -845,8 +843,7 @@ object BlazeConverters extends Logging { addRenameColumnsExec(convertToNative(child)) case _ => if (needRenameColumns(child)) { - val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) :+ - NativeAggBase.AGG_BUF_COLUMN_NAME + val newNames = exec.groupingExpressions.map(Util.getFieldNameByExprId) Shims.get.createNativeRenameColumnsExec(convertToNative(child), newNames) } else { convertToNative(child) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeSparkSessionExtension.scala index 6db25c84..e26b9a7f 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/BlazeSparkSessionExtension.scala @@ -85,7 +85,7 @@ case class BlazeColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu dumpSimpleSparkPlanTreeNode(sparkPlanTransformed) logInfo(s"Transformed spark plan after preColumnarTransitions:\n${sparkPlanTransformed - .treeString(verbose = true, addSuffix = true)}") + .treeString(verbose = true, addSuffix = true)}") // post-transform Shims.get.postTransform(sparkPlanTransformed, sparkSession.sparkContext) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala index 42a3775d..4b8659f9 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/blaze/NativeHelper.scala @@ -61,7 +61,8 @@ object NativeHelper extends Logging { val nativeMemory: Long = { val heapMemory = Runtime.getRuntime.maxMemory() val offheapMemory = totalMemory - heapMemory - logWarning(s"memory total: $totalMemory, onheap: $heapMemory, offheap: $offheapMemory") + logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString( + heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") offheapMemory } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala index 13bbf877..2078117b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/blaze/plan/NativeAggBase.scala @@ -55,7 +55,7 @@ import org.apache.spark.sql.execution.blaze.plan.NativeAggBase.HashAgg import org.apache.spark.sql.execution.blaze.plan.NativeAggBase.NativeAggrInfo import org.apache.spark.sql.execution.blaze.plan.NativeAggBase.SortAgg import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, LongType} abstract class NativeAggBase( execMode: AggExecMode, @@ -143,9 +143,10 @@ abstract class NativeAggBase( if (nativeAggrModes.contains(pb.AggMode.FINAL)) { groupingExpressions.map(_.toAttribute) ++ aggregateAttributes } else { - groupingExpressions.map(_.toAttribute) :+ - AttributeReference(NativeAggBase.AGG_BUF_COLUMN_NAME, BinaryType, nullable = false)( - ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID)) + val aggBufferAttrs = nativeAggrInfos + .flatMap(_.aggBufferDataTypes) + .map(AttributeReference("", _, nullable = false)(ExprId.apply(0))) + groupingExpressions.map(_.toAttribute) ++ aggBufferAttrs } override def outputPartitioning: Partitioning = @@ -205,38 +206,18 @@ abstract class NativeAggBase( } object NativeAggBase extends Logging { - - val AGG_BUF_COLUMN_EXPR_ID = 9223372036854775807L - val AGG_BUF_COLUMN_NAME = s"#$AGG_BUF_COLUMN_EXPR_ID" - trait AggExecMode; case object HashAgg extends AggExecMode case object SortAgg extends AggExecMode - case class NativeAggrPartialState(stateAttr: Attribute, arrowType: pb.ArrowType) - - object NativeAggrPartialState { - def apply( - aggrAttr: Attribute, - stateFieldName: String, - dataType: DataType, - nullable: Boolean, - arrowType: pb.ArrowType = null): NativeAggrPartialState = { - - val fieldName = s"${Util.getFieldNameByExprId(aggrAttr)}[$stateFieldName]" - val stateAttr = AttributeReference(fieldName, dataType, nullable)(aggrAttr.exprId) - NativeAggrPartialState( - stateAttr, - arrowType = Option(arrowType).getOrElse(NativeConverters.convertDataType(dataType))) - } - } - case class NativeAggrInfo( mode: AggregateMode, nativeAggrs: Seq[pb.PhysicalExprNode], + aggBufferDataTypes: Seq[DataType], outputAttr: Attribute) def getNativeAggrInfo(aggr: AggregateExpression, aggrAttr: Attribute): NativeAggrInfo = { + val aggBufferDataTypes = computeNativeAggBufferDataTypes(aggr.aggregateFunction) val reducedAggr = AggregateExpression( aggr.aggregateFunction .mapChildren(e => createPlaceholder(e)) @@ -249,12 +230,17 @@ object NativeAggBase extends Logging { aggr.mode match { case Partial => - NativeAggrInfo(aggr.mode, NativeConverters.convertAggregateExpr(aggr) :: Nil, outputAttr) + NativeAggrInfo( + aggr.mode, + NativeConverters.convertAggregateExpr(aggr) :: Nil, + aggBufferDataTypes, + outputAttr) case PartialMerge | Final => NativeAggrInfo( aggr.mode, NativeConverters.convertAggregateExpr(reducedAggr) :: Nil, + aggBufferDataTypes, outputAttr) case Complete => @@ -306,4 +292,17 @@ object NativeAggBase extends Logging { } findRecursive(exec.children.head) } + + def computeNativeAggBufferDataTypes(aggr: AggregateFunction): Seq[DataType] = { + aggr match { + case _: Count => Seq(LongType) + case f: Max => Seq(f.dataType) + case f: Min => Seq(f.dataType) + case f: Sum => Seq(f.dataType) + case f: Average => Seq(f.dataType, LongType) + case f @ First(_, true) => Seq(f.dataType) + case f @ First(_, false) => Seq(f.dataType, BooleanType) + case _ => Seq(BinaryType) + } + } }
