This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch dev-columnar-agg-buf
in repository https://gitbox.apache.org/repos/asf/auron.git

commit 1a78e10cff46a001597604888ced375dcd3d1417
Author: zhangli20 <[email protected]>
AuthorDate: Wed Jan 21 17:48:44 2026 +0800

    stub
---
 native-engine/datafusion-ext-plans/src/agg/acc.rs  | 210 +++++++++------------
 native-engine/datafusion-ext-plans/src/agg/agg.rs  |   1 +
 .../datafusion-ext-plans/src/agg/agg_ctx.rs        |  85 +++++----
 .../datafusion-ext-plans/src/agg/agg_table.rs      |  12 +-
 native-engine/datafusion-ext-plans/src/agg/avg.rs  |  23 ++-
 .../datafusion-ext-plans/src/agg/bloom_filter.rs   |  43 +++--
 .../src/agg/brickhouse/collect.rs                  |   4 +
 .../src/agg/brickhouse/combine_unique.rs           |   4 +
 .../datafusion-ext-plans/src/agg/collect.rs        |  40 +++-
 .../datafusion-ext-plans/src/agg/count.rs          | 143 +++-----------
 .../datafusion-ext-plans/src/agg/first.rs          |  36 ++--
 .../src/agg/first_ignores_null.rs                  |  18 +-
 .../datafusion-ext-plans/src/agg/maxmin.rs         |  13 +-
 .../src/agg/spark_udaf_wrapper.rs                  |  46 +++--
 native-engine/datafusion-ext-plans/src/agg/sum.rs  |  20 +-
 .../apache/spark/sql/auron/AuronConverters.scala   |  15 +-
 .../sql/auron/AuronSparkSessionExtension.scala     |   2 +-
 .../org/apache/spark/sql/auron/NativeHelper.scala  |   2 +-
 .../spark/sql/auron/util/TaskContextHelper.scala   |   2 +-
 .../sql/execution/auron/plan/NativeAggBase.scala   |  48 +++--
 .../NativeParquetInsertIntoHiveTableBase.scala     |   7 +-
 21 files changed, 371 insertions(+), 403 deletions(-)

diff --git a/native-engine/datafusion-ext-plans/src/agg/acc.rs 
b/native-engine/datafusion-ext-plans/src/agg/acc.rs
index d3ce8049..eab4e183 100644
--- a/native-engine/datafusion-ext-plans/src/agg/acc.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/acc.rs
@@ -15,7 +15,7 @@
 
 use std::{
     any::Any,
-    io::{Cursor, Read, Write},
+    io::{Read, Write},
     sync::Arc,
 };
 
@@ -25,7 +25,7 @@ use arrow::{
 };
 use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use bitvec::{bitvec, vec::BitVec};
-use byteorder::{ReadBytesExt, WriteBytesExt};
+use byteorder::WriteBytesExt;
 use datafusion::common::{Result, ScalarValue, utils::proxy::VecAllocExt};
 use datafusion_ext_commons::{
     SliceAsRawBytes, UninitializedInit, df_execution_err, downcast_any,
@@ -43,9 +43,9 @@ pub trait AccColumn: Send {
     fn shrink_to_fit(&mut self);
     fn num_records(&self) -> usize;
     fn mem_used(&self) -> usize;
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()>;
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()>;
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()>;
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>>;
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()>;
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()>;
     fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> 
Result<()>;
 
     fn ensure_size(&mut self, idx: IdxSelection<'_>) {
@@ -137,8 +137,7 @@ impl AccBooleanColumn {
         }
     }
 
-    pub fn to_array(&self, dt: &DataType, idx: IdxSelection<'_>) -> 
Result<ArrayRef> {
-        assert!(dt == &DataType::Boolean);
+    pub fn to_array(&self, idx: IdxSelection<'_>) -> Result<ArrayRef> {
         idx_with_iter!((idx @ idx) => {
             Ok(Arc::new(BooleanArray::from_iter(
                 idx.map(|i| self.valids[i].then_some(self.values[i]))
@@ -174,38 +173,32 @@ impl AccColumn for AccBooleanColumn {
         self.num_records() / 4 // 2 bits for each value
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        idx_with_iter!((idx @ idx) => {
-            for (i, w) in idx.zip(array) {
-                if self.valids[i] {
-                    w.write_u8(1 + self.values[i] as u8)?;
-                } else {
-                    w.write_u8(0)?;
-                }
-            }
-        });
-        Ok(())
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let array = self.to_array(idx)?;
+        Ok(vec![array])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        self.resize(0);
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        let array = downcast_any!(&arrays[0], BooleanArray)?;
+        self.values.clear();
+        self.valids.clear();
 
-        for cursor in cursors {
-            match cursor.read_u8()? {
-                0 => {
-                    self.valids.push(false);
-                    self.values.push(false);
-                }
-                v => {
-                    self.valids.push(true);
-                    self.values.push(v - 1 != 0);
-                }
+        // fill values
+        for i in 0..array.len() {
+            self.values.push(array.value(i));
+        }
+
+        // fill valids
+        if let Some(nb) = array.nulls() {
+            for bit in nb {
+                self.valids.push(bit);
             }
         }
+        self.valids.resize(self.values.len(), true);
         Ok(())
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()> {
         let mut buf = vec![];
 
         idx_for! {
@@ -240,13 +233,15 @@ impl AccColumn for AccBooleanColumn {
 pub struct AccPrimColumn<T: ArrowNativeType> {
     values: Vec<T>,
     valids: BitVec,
+    dt: DataType,
 }
 
 impl<T: ArrowNativeType> AccPrimColumn<T> {
-    pub fn new(num_records: usize) -> Self {
+    pub fn new(num_records: usize, dt: DataType) -> Self {
         Self {
             values: vec![T::default(); num_records],
             valids: bitvec![0; num_records],
+            dt,
         }
     }
 
@@ -329,39 +324,30 @@ impl<T: ArrowNativeType> AccColumn for AccPrimColumn<T> {
         self.values.allocated_size() + (self.valids.capacity() + 7) / 8
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        idx_with_iter!((idx @ idx) => {
-            for (i, w) in idx.zip(array) {
-                if self.valids[i] {
-                    w.write_u8(1)?;
-                    w.write_all([self.values[i]].as_raw_bytes())?;
-                } else {
-                    w.write_u8(0)?;
-                }
-            }
-        });
-        Ok(())
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let array = self.to_array(&self.dt, idx)?;
+        Ok(vec![array])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        let array_data = arrays[0].to_data();
         self.resize(0);
-        let mut value_buf = [T::default()];
-
-        for cursor in cursors {
-            let valid = cursor.read_u8()?;
-            if valid == 1 {
-                cursor.read_exact(value_buf.as_raw_bytes_mut())?;
-                self.values.push(value_buf[0]);
-                self.valids.push(true);
-            } else {
-                self.values.push(T::default());
-                self.valids.push(false);
+
+        // fill values
+        let buffer = array_data.buffer::<T>(0);
+        self.values = 
buffer[array_data.offset()..][..array_data.len()].to_vec();
+
+        // fill valids
+        if let Some(nb) = array_data.nulls() {
+            for bit in nb {
+                self.valids.push(bit);
             }
         }
+        self.valids.resize(self.values.len(), true);
         Ok(())
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()> {
         // write valids
         let mut bits: BitVec<u8> = BitVec::with_capacity(idx.len());
         idx_for! {
@@ -411,13 +397,15 @@ impl<T: ArrowNativeType> AccColumn for AccPrimColumn<T> {
 pub struct AccBytesColumn {
     items: Vec<Option<AccBytes>>,
     heap_mem_used: usize,
+    dt: DataType,
 }
 
 impl AccBytesColumn {
-    pub fn new(num_records: usize) -> Self {
+    pub fn new(num_records: usize, dt: DataType) -> Self {
         Self {
             items: vec![None; num_records],
             heap_mem_used: 0,
+            dt,
         }
     }
 
@@ -436,13 +424,13 @@ impl AccBytesColumn {
         self.heap_mem_used += self.item_heap_mem_used(idx);
     }
 
-    fn to_array(&self, dt: &DataType, idx: IdxSelection<'_>) -> 
Result<ArrayRef> {
+    fn to_array(&self, idx: IdxSelection<'_>) -> Result<ArrayRef> {
         let binary;
 
         idx_with_iter!((idx @ idx) => {
             binary = BinaryArray::from_iter(idx.map(|i| 
self.items[i].as_ref()));
         });
-        match dt {
+        match &self.dt {
             DataType::Utf8 => Ok(make_array(
                 binary
                     .to_data()
@@ -451,7 +439,7 @@ impl AccBytesColumn {
                     .build()?,
             )),
             DataType::Binary => Ok(Arc::new(binary)),
-            _ => df_execution_err!("expected string or binary type, got 
{dt:?}"),
+            dt => df_execution_err!("expected string or binary type, got 
{dt:?}"),
         }
     }
 
@@ -532,25 +520,34 @@ impl AccColumn for AccBytesColumn {
         self.heap_mem_used + self.items.allocated_size()
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        idx_with_iter!((idx @ idx) => {
-            for (i, w) in idx.zip(array) {
-                self.save_value(i, w)?;
-            }
-        });
-        Ok(())
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let array = self.to_array(idx)?;
+        Ok(vec![array])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        self.items.resize(0, Default::default());
-        for cursor in cursors {
-            self.load_value(cursor)?;
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        let array_data = arrays[0].to_data();
+        self.items.clear();
+
+        // fill values
+        let data_buffer = array_data.buffer::<u8>(0);
+        let offset_buffer = array_data.buffer::<i32>(1);
+        for i in 0..array_data.len() {
+            if array_data.is_valid(i) {
+                let offset_begin = offset_buffer[array_data.offset() + i] as 
usize;
+                let offset_end = offset_buffer[array_data.offset() + i + 1] as 
usize;
+                self.items.push(Some(AccBytes::from_slice(
+                    &data_buffer[offset_begin..offset_end],
+                )));
+            } else {
+                self.items.push(None);
+            }
         }
         self.refresh_heap_mem_used();
         Ok(())
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()> {
         idx_for! {
             (idx in idx) => {
                 self.save_value(idx, w)?;
@@ -576,17 +573,17 @@ pub struct AccScalarValueColumn {
 }
 
 impl AccScalarValueColumn {
-    pub fn new(dt: &DataType, num_rows: usize) -> Self {
-        let null_value = ScalarValue::try_from(dt).expect("unsupported data 
type");
+    pub fn new(dt: DataType, num_rows: usize) -> Self {
+        let null_value = ScalarValue::try_from(&dt).expect("unsupported data 
type");
         Self {
             items: (0..num_rows).map(|_| null_value.clone()).collect(),
-            dt: dt.clone(),
+            dt,
             null_value,
             heap_mem_used: 0,
         }
     }
 
-    pub fn to_array(&mut self, _dt: &DataType, idx: IdxSelection<'_>) -> 
Result<ArrayRef> {
+    pub fn to_array(&mut self, idx: IdxSelection<'_>) -> Result<ArrayRef> {
         idx_with_iter!((idx @ idx) => {
             ScalarValue::iter_to_array(idx.map(|i| {
                 std::mem::replace(&mut self.items[i], self.null_value.clone())
@@ -642,38 +639,36 @@ impl AccColumn for AccScalarValueColumn {
         self.heap_mem_used + self.items.allocated_size()
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        idx_with_iter!((idx @ idx) => {
-            for (i, w) in idx.zip(array) {
-                write_scalar(&self.items[i], true, w)?;
-            }
-        });
-        Ok(())
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let array = self.to_array(idx)?; // data type is not used
+        Ok(vec![array])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        self.items.truncate(0);
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        let array = &arrays[0];
+
+        self.items.clear();
         self.heap_mem_used = 0;
 
-        for cursor in cursors {
-            let scalar = read_scalar(cursor, &self.dt, true)?;
+        for i in 0..array.len() {
+            let scalar = ScalarValue::try_from_array(array, i)?;
             self.heap_mem_used += scalar_value_heap_mem_size(&scalar);
-            self.items.push(scalar);
         }
         Ok(())
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()> {
         idx_for! {
             (idx in idx) => {
-                write_scalar(&self.items[idx], true, w)?;
+                let scalar = self.take_value(idx);
+                write_scalar(&scalar, true, w)?;
             }
         }
         Ok(())
     }
 
     fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> 
Result<()> {
-        self.items.truncate(0);
+        self.items.clear();
         self.heap_mem_used = 0;
 
         for _ in 0..num_rows {
@@ -685,43 +680,18 @@ impl AccColumn for AccScalarValueColumn {
     }
 }
 
-pub fn create_acc_generic_column(dt: &DataType, num_rows: usize) -> 
AccColumnRef {
+pub fn create_acc_generic_column(dt: DataType, num_rows: usize) -> 
AccColumnRef {
     macro_rules! primitive_helper {
         ($t:ty) => {
             Box::new(AccPrimColumn::<<$t as ArrowPrimitiveType>::Native>::new(
-                num_rows,
+                num_rows, dt,
             ))
         };
     }
     downcast_primitive! {
         dt => (primitive_helper),
         DataType::Boolean => Box::new(AccBooleanColumn::new(num_rows)),
-        DataType::Utf8 | DataType::Binary => 
Box::new(AccBytesColumn::new(num_rows)),
+        DataType::Utf8 | DataType::Binary => 
Box::new(AccBytesColumn::new(num_rows, dt)),
         other => Box::new(AccScalarValueColumn::new(other, num_rows)),
     }
 }
-
-pub fn acc_generic_column_to_array(
-    column: &mut AccColumnRef,
-    dt: &DataType,
-    idx: IdxSelection<'_>,
-) -> Result<ArrayRef> {
-    macro_rules! primitive_helper {
-        ($t:ty) => {
-            downcast_any!(column, mut AccPrimColumn::<<$t as 
ArrowPrimitiveType>::Native>)?
-                .to_array(dt, idx)
-        };
-    }
-    downcast_primitive! {
-        dt => (primitive_helper),
-        DataType::Boolean => {
-            downcast_any!(column, mut AccBooleanColumn)?.to_array(dt, idx)
-        }
-        DataType::Utf8 | DataType::Binary => {
-            downcast_any!(column, mut AccBytesColumn)?.to_array(dt, idx)
-        }
-        _other => {
-            downcast_any!(column, mut AccScalarValueColumn)?.to_array(dt, idx)
-        }
-    }
-}
diff --git a/native-engine/datafusion-ext-plans/src/agg/agg.rs 
b/native-engine/datafusion-ext-plans/src/agg/agg.rs
index b43980ed..5eb4c3da 100644
--- a/native-engine/datafusion-ext-plans/src/agg/agg.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/agg.rs
@@ -44,6 +44,7 @@ pub trait Agg: Send + Sync + Debug {
     fn data_type(&self) -> &DataType;
     fn nullable(&self) -> bool;
     fn create_acc_column(&self, num_rows: usize) -> AccColumnRef;
+    fn acc_array_data_types(&self) -> &[DataType];
     fn with_new_exprs(&self, exprs: Vec<PhysicalExprRef>) -> Result<Arc<dyn 
Agg>>;
 
     fn prepare_partial_args(&self, partial_inputs: &[ArrayRef]) -> 
Result<Vec<ArrayRef>> {
diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs 
b/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs
index 2794684f..d36876eb 100644
--- a/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/agg_ctx.rs
@@ -15,13 +15,12 @@
 
 use std::{
     fmt::{Debug, Formatter},
-    io::Cursor,
     sync::Arc,
 };
 
 use arrow::{
-    array::{ArrayRef, BinaryArray, RecordBatchOptions},
-    datatypes::{DataType, Field, Fields, Schema, SchemaRef},
+    array::{Array, ArrayRef, RecordBatchOptions},
+    datatypes::{Field, Fields, Schema, SchemaRef},
     record_batch::RecordBatch,
     row::{RowConverter, Rows, SortField},
 };
@@ -29,20 +28,16 @@ use auron_jni_bridge::{
     conf,
     conf::{BooleanConf, DoubleConf, IntConf},
 };
-use datafusion::{
-    common::{Result, cast::as_binary_array},
-    physical_expr::PhysicalExprRef,
-};
+use datafusion::{common::Result, physical_expr::PhysicalExprRef};
 use datafusion_ext_commons::{downcast_any, suggested_batch_mem_size};
 use once_cell::sync::OnceCell;
 use parking_lot::Mutex;
 
 use crate::{
     agg::{
-        AGG_BUF_COLUMN_NAME, AggExecMode, AggExpr, AggMode, GroupingExpr,
+        AggExecMode, AggExpr, AggMode, GroupingExpr,
         acc::AccTable,
         agg::{Agg, IdxSelection},
-        agg_hash_map::AggHashMapKey,
         spark_udaf_wrapper::{AccUDAFBufferRowsColumn, SparkUDAFMemTracker, 
SparkUDAFWrapper},
     },
     common::{
@@ -63,6 +58,8 @@ pub struct AggContext {
     pub grouping_row_converter: Arc<Mutex<RowConverter>>,
     pub groupings: Vec<GroupingExpr>,
     pub aggs: Vec<AggExpr>,
+    pub input_acc_arrays_len: usize,
+    pub output_acc_arrays_len: usize,
     pub supports_partial_skipping: bool,
     pub partial_skipping_ratio: f64,
     pub partial_skipping_min_rows: usize,
@@ -137,7 +134,11 @@ impl AggContext {
                 ));
             }
         } else {
-            agg_fields.push(Field::new(AGG_BUF_COLUMN_NAME, DataType::Binary, 
false));
+            for agg in &aggs {
+                for dt in agg.agg.acc_array_data_types() {
+                    agg_fields.push(Field::new("", dt.clone(), true));
+                }
+            }
         }
         let agg_schema = Arc::new(Schema::new(agg_fields));
         let output_schema = Arc::new(Schema::new(
@@ -148,6 +149,17 @@ impl AggContext {
             .concat(),
         ));
 
+        let input_acc_arrays_len = aggs
+            .iter()
+            .filter(|agg| agg.mode.is_partial_merge() || agg.mode.is_final())
+            .map(|agg| agg.agg.acc_array_data_types().len())
+            .sum();
+        let output_acc_arrays_len = aggs
+            .iter()
+            .filter(|agg| agg.mode.is_partial() || agg.mode.is_partial_merge())
+            .map(|agg| agg.agg.acc_array_data_types().len())
+            .sum();
+
         let agg_exprs_flatten: Vec<PhysicalExprRef> = aggs
             .iter()
             .filter(|agg| agg.mode.is_partial())
@@ -195,6 +207,8 @@ impl AggContext {
             grouping_row_converter,
             groupings,
             aggs,
+            input_acc_arrays_len,
+            output_acc_arrays_len,
             agg_expr_evaluator,
             supports_partial_skipping,
             partial_skipping_ratio,
@@ -277,22 +291,15 @@ impl AggContext {
             let mut merging_acc_table = self.create_acc_table(0);
 
             if self.need_partial_merge {
-                let partial_merged_array =
-                    as_binary_array(batch.columns().last().expect("last 
column"))?;
-                let array = partial_merged_array
-                    .iter()
-                    .skip(batch_start_idx)
-                    .take(batch_end_idx - batch_start_idx)
-                    .map(|bytes| bytes.expect("non-null bytes"))
-                    .collect::<Vec<_>>();
-                let mut cursors = array
-                    .iter()
-                    .map(|bytes| Cursor::new(bytes.as_bytes()))
-                    .collect::<Vec<_>>();
-
-                for (agg_idx, _agg) in &self.need_partial_merge_aggs {
-                    let acc_col = &mut merging_acc_table.cols_mut()[*agg_idx];
-                    acc_col.unfreeze_from_rows(&mut cursors)?;
+                let mut acc_arrays_start = batch.num_columns() - 
self.input_acc_arrays_len;
+                for (agg_idx, agg) in &self.need_partial_merge_aggs {
+                    let acc_arrays = &batch.columns()[acc_arrays_start..]
+                        .iter()
+                        .take(agg.acc_array_data_types().len())
+                        .map(|array| array.slice(batch_start_idx, 
batch_end_idx - batch_start_idx))
+                        .collect::<Vec<_>>();
+                    acc_arrays_start += agg.acc_array_data_types().len();
+                    
merging_acc_table.cols_mut()[*agg_idx].unfreeze_from_arrays(&acc_arrays)?;
                 }
             }
             let batch_selection = IdxSelection::Range(0, batch_end_idx - 
batch_start_idx);
@@ -320,9 +327,7 @@ impl AggContext {
             }
             Ok(agg_columns)
         } else {
-            // output acc as a binary column
-            let freezed = self.freeze_acc_table(acc_table, idx)?;
-            Ok(vec![Arc::new(BinaryArray::from_iter_values(freezed))])
+            self.freeze_acc_table(acc_table, idx)
         }
     }
 
@@ -407,23 +412,23 @@ impl AggContext {
 
     pub fn freeze_acc_table(
         &self,
-        acc_table: &AccTable,
+        acc_table: &mut AccTable,
         acc_idx: IdxSelection,
-    ) -> Result<Vec<Vec<u8>>> {
+    ) -> Result<Vec<ArrayRef>> {
         let udaf_indices_cache = OnceCell::new();
-        let mut vec = vec![vec![]; acc_idx.len()];
-        for acc_col in acc_table.cols() {
+        let mut arrays = vec![];
+
+        for acc_col in acc_table.cols_mut() {
             if let Ok(udaf_acc_col) = downcast_any!(acc_col, 
AccUDAFBufferRowsColumn) {
-                udaf_acc_col.freeze_to_rows_with_indices_cache(
-                    acc_idx,
-                    &mut vec,
-                    &udaf_indices_cache,
-                )?;
+                arrays.push(
+                    udaf_acc_col
+                        .freeze_to_array_with_indices_cache(acc_idx, 
&udaf_indices_cache)?,
+                );
             } else {
-                acc_col.freeze_to_rows(acc_idx, &mut vec)?;
+                arrays.extend(acc_col.freeze_to_arrays(acc_idx)?);
             }
         }
-        Ok(vec)
+        Ok(arrays)
     }
 
     pub async fn process_partial_skipped(
diff --git a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs 
b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
index c432c892..fc3a6617 100644
--- a/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/agg_table.rs
@@ -544,7 +544,7 @@ impl HashingData {
         // sort all records using radix sort on hashcodes of keys
         let num_spill_buckets = 
self.agg_ctx.num_spill_buckets(self.mem_used());
         let key_rows = self.map.into_keys();
-        let acc_table = self.acc_table;
+        let mut acc_table = self.acc_table;
         let mut entries = key_rows
             .iter()
             .enumerate()
@@ -568,7 +568,7 @@ impl HashingData {
                 write_spill_bucket(
                     &mut writer,
                     &self.agg_ctx,
-                    &acc_table,
+                    &mut acc_table,
                     entries[offset..][..cur_bucket_count]
                         .iter()
                         .map(|&(_, record_idx)| &key_rows[record_idx as 
usize]),
@@ -674,7 +674,7 @@ impl MergingData {
         entries.shrink_to_fit();
 
         let key_rows = self.key_rows;
-        let acc_table = self.acc_table;
+        let mut acc_table = self.acc_table;
         let mut bucket_counts = vec![0; num_spill_buckets];
 
         radix_sort_by_key(&mut entries, &mut bucket_counts, |(bucket_id, ..)| {
@@ -693,7 +693,7 @@ impl MergingData {
                 write_spill_bucket(
                     &mut writer,
                     &self.agg_ctx,
-                    &acc_table,
+                    &mut acc_table,
                     entries[offset..][..cur_bucket_count].iter().map(
                         |&(_, batch_idx, row_idx, _)| {
                             key_rows[batch_idx as usize]
@@ -722,7 +722,7 @@ impl MergingData {
 fn write_spill_bucket(
     w: &mut SpillCompressedWriter,
     agg_ctx: &AggContext,
-    acc_table: &AccTable,
+    acc_table: &mut AccTable,
     key_iter: impl Iterator<Item = impl AsRef<[u8]>>,
     acc_idx_iter: impl Iterator<Item = usize>,
     spill_idx: usize,
@@ -730,7 +730,7 @@ fn write_spill_bucket(
     // write accs
     let udaf_indices_cache = OnceCell::new();
     let acc_indices: Vec<usize> = acc_idx_iter.collect();
-    for col in acc_table.cols() {
+    for col in acc_table.cols_mut() {
         if let Ok(udaf_col) = downcast_any!(col, AccUDAFBufferRowsColumn) {
             udaf_col.spill_with_indices_cache(
                 IdxSelection::Indices(&acc_indices),
diff --git a/native-engine/datafusion-ext-plans/src/agg/avg.rs 
b/native-engine/datafusion-ext-plans/src/agg/avg.rs
index 008dd34b..47c5874b 100644
--- a/native-engine/datafusion-ext-plans/src/agg/avg.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/avg.rs
@@ -16,7 +16,6 @@
 use std::{
     any::Any,
     fmt::{Debug, Formatter},
-    io::Cursor,
     sync::Arc,
 };
 
@@ -44,6 +43,7 @@ pub struct AggAvg {
     data_type: DataType,
     agg_sum: AggSum,
     agg_count: AggCount,
+    acc_array_data_types: Vec<DataType>,
 }
 
 impl AggAvg {
@@ -51,6 +51,7 @@ impl AggAvg {
         let agg_sum = AggSum::try_new(child.clone(), data_type.clone())?;
         let agg_count = AggCount::try_new(vec![child.clone()], 
DataType::Int64)?;
         Ok(Self {
+            acc_array_data_types: vec![data_type.clone(), DataType::Int64],
             child,
             data_type,
             agg_sum,
@@ -104,6 +105,10 @@ impl Agg for AggAvg {
         })
     }
 
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &self.acc_array_data_types
+    }
+
     fn partial_update(
         &self,
         accs: &mut AccColumnRef,
@@ -206,19 +211,19 @@ impl AccColumn for AccAvgColumn {
         self.sum.mem_used() + self.count.mem_used()
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        self.sum.freeze_to_rows(idx, array)?;
-        self.count.freeze_to_rows(idx, array)?;
-        Ok(())
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let sum_array = self.sum.freeze_to_arrays(idx)?[0].clone();
+        let count_array = self.count.freeze_to_arrays(idx)?[0].clone();
+        Ok(vec![sum_array, count_array])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        self.sum.unfreeze_from_rows(cursors)?;
-        self.count.unfreeze_from_rows(cursors)?;
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        self.sum.unfreeze_from_arrays(&arrays[0..1])?;
+        self.count.unfreeze_from_arrays(&arrays[1..2])?;
         Ok(())
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, buf: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, buf: &mut 
SpillCompressedWriter) -> Result<()> {
         self.sum.spill(idx, buf)?;
         self.count.spill(idx, buf)?;
         Ok(())
diff --git a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs 
b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs
index 6aff3c84..4688e131 100644
--- a/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/bloom_filter.rs
@@ -21,7 +21,7 @@ use std::{
 };
 
 use arrow::{
-    array::{ArrayRef, AsArray, BinaryBuilder},
+    array::{ArrayRef, AsArray, BinaryArray, BinaryBuilder},
     datatypes::{DataType, Int64Type},
 };
 use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
@@ -108,6 +108,10 @@ impl Agg for AggBloomFilter {
         bloom_filters
     }
 
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &[DataType::Binary]
+    }
+
     fn partial_update(
         &self,
         accs: &mut AccColumnRef,
@@ -245,36 +249,41 @@ impl AccColumn for AccBloomFilterColumn {
             .sum()
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let mut array = vec![];
+
         idx_for! {
             (idx in idx) => {
-                let w = &mut array[idx];
                 if let Some(bloom_filter) = &self.bloom_filters[idx] {
+                    let mut w = vec![];
                     w.write_u8(1)?;
-                    bloom_filter.write_to(w)?;
+                    bloom_filter.write_to(&mut w)?;
+                    array.push(Some(w));
                 } else {
-                    w.write_u8(0)?;
+                    array.push(None);
                 }
             }
         }
-        Ok(())
+
+        let array = Arc::new(BinaryArray::from_iter(array));
+        Ok(vec![array])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        assert_eq!(self.num_records(), 0, "expect empty AccColumn");
-        for r in cursors {
-            self.bloom_filters.push({
-                if r.read_u8()? == 1 {
-                    Some(SparkBloomFilter::read_from(r)?)
-                } else {
-                    None
-                }
-            });
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        let array = downcast_any!(arrays[0], BinaryArray)?;
+
+        for v in array {
+            if let Some(w) = v {
+                self.bloom_filters
+                    .push(Some(SparkBloomFilter::read_from(&mut 
Cursor::new(w))?));
+            } else {
+                self.bloom_filters.push(None);
+            }
         }
         Ok(())
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()> {
         idx_for! {
             (idx in idx) => {
                 if let Some(bloom_filter) = &self.bloom_filters[idx] {
diff --git a/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs 
b/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs
index 8d982b48..f7aeb69e 100644
--- a/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/brickhouse/collect.rs
@@ -81,6 +81,10 @@ impl Agg for AggCollect {
         self.innert_collect_list.create_acc_column(num_rows)
     }
 
+    fn acc_array_data_types(&self) -> &[DataType] {
+        self.innert_collect_list.acc_array_data_types()
+    }
+
     fn partial_update(
         &self,
         accs: &mut AccColumnRef,
diff --git 
a/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs 
b/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs
index 4e7144c6..4330e1cd 100644
--- a/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/brickhouse/combine_unique.rs
@@ -81,6 +81,10 @@ impl Agg for AggCombineUnique {
         self.inner_collect_set.create_acc_column(num_rows)
     }
 
+    fn acc_array_data_types(&self) -> &[DataType] {
+        self.inner_collect_set.acc_array_data_types()
+    }
+
     fn partial_update(
         &self,
         accs: &mut AccColumnRef,
diff --git a/native-engine/datafusion-ext-plans/src/agg/collect.rs 
b/native-engine/datafusion-ext-plans/src/agg/collect.rs
index 225d4303..b20acafd 100644
--- a/native-engine/datafusion-ext-plans/src/agg/collect.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/collect.rs
@@ -116,6 +116,10 @@ impl<C: AccCollectionColumn> Agg for AggGenericCollect<C> {
         col
     }
 
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &[DataType::Binary]
+    }
+
     fn partial_update(
         &self,
         accs: &mut AccColumnRef,
@@ -292,15 +296,23 @@ impl AccColumn for AccSetColumn {
         self.mem_used + self.set.capacity() * size_of::<AccSet>()
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        AccCollectionColumn::freeze_to_rows(self, idx, array)
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let mut array = vec![vec![]; idx.len()];
+        AccCollectionColumn::freeze_to_rows(self, idx, &mut array)?;
+        Ok(vec![Arc::new(BinaryArray::from_iter_values(array))])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        AccCollectionColumn::unfreeze_from_rows(self, cursors)
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        let array = downcast_any!(arrays[0], BinaryArray)?;
+        let mut cursors = vec![];
+
+        for i in 0..array.len() {
+            cursors.push(Cursor::new(array.value(i)));
+        }
+        AccCollectionColumn::unfreeze_from_rows(self, &mut cursors)
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()> {
         idx_for! {
             (idx in idx) => {
                 self.save_raw(idx, w)?;
@@ -404,15 +416,23 @@ impl AccColumn for AccListColumn {
         self.mem_used + self.list.capacity() * size_of::<AccList>()
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        AccCollectionColumn::freeze_to_rows(self, idx, array)
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let mut array = vec![vec![]; idx.len()];
+        AccCollectionColumn::freeze_to_rows(self, idx, &mut array)?;
+        Ok(vec![Arc::new(BinaryArray::from_iter_values(array))])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        AccCollectionColumn::unfreeze_from_rows(self, cursors)
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        let array = downcast_any!(arrays[0], BinaryArray)?;
+        let mut cursors = vec![];
+
+        for i in 0..array.len() {
+            cursors.push(Cursor::new(array.value(i)));
+        }
+        AccCollectionColumn::unfreeze_from_rows(self, &mut cursors)
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()> {
         idx_for! {
             (idx in idx) => {
                 self.save_raw(idx, w)?;
diff --git a/native-engine/datafusion-ext-plans/src/agg/count.rs 
b/native-engine/datafusion-ext-plans/src/agg/count.rs
index ab17c5d3..1b5a47f4 100644
--- a/native-engine/datafusion-ext-plans/src/agg/count.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/count.rs
@@ -16,24 +16,19 @@
 use std::{
     any::Any,
     fmt::{Debug, Formatter},
-    io::Cursor,
     sync::Arc,
 };
 
 use arrow::{array::*, datatypes::*};
-use auron_memmgr::spill::{SpillCompressedReader, SpillCompressedWriter};
 use datafusion::{common::Result, physical_expr::PhysicalExprRef};
-use datafusion_ext_commons::{
-    downcast_any,
-    io::{read_len, write_len},
-};
+use datafusion_ext_commons::downcast_any;
 
 use crate::{
     agg::{
-        acc::{AccColumn, AccColumnRef},
+        acc::{AccColumn, AccColumnRef, AccPrimColumn},
         agg::{Agg, IdxSelection},
     },
-    idx_for, idx_for_zipped, idx_with_iter,
+    idx_for_zipped,
 };
 
 pub struct AggCount {
@@ -82,9 +77,11 @@ impl Agg for AggCount {
     }
 
     fn create_acc_column(&self, num_rows: usize) -> Box<dyn AccColumn> {
-        Box::new(AccCountColumn {
-            values: vec![0; num_rows],
-        })
+        Box::new(AccPrimColumn::<i64>::new(num_rows, DataType::Int64))
+    }
+
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &[DataType::Int64]
     }
 
     fn partial_update(
@@ -94,32 +91,15 @@ impl Agg for AggCount {
         partial_args: &[ArrayRef],
         partial_arg_idx: IdxSelection<'_>,
     ) -> Result<()> {
-        let accs = downcast_any!(accs, mut AccCountColumn)?;
+        let accs = downcast_any!(accs, mut AccPrimColumn<i64>)?;
         accs.ensure_size(acc_idx);
 
-        if partial_args.is_empty() {
-            idx_for_zipped! {
-                ((acc_idx, _partial_arg_idx) in (acc_idx, partial_arg_idx)) => 
{
-                    if acc_idx >= accs.values.len() {
-                        accs.values.push(1);
-                    } else {
-                        accs.values[acc_idx] += 1;
-                    }
-                }
-            }
-        } else {
-            idx_for_zipped! {
-                ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
-                    let add = partial_args
-                        .iter()
-                        .all(|arg| arg.is_valid(partial_arg_idx)) as i64;
-
-                    if acc_idx >= accs.values.len() {
-                        accs.values.push(add);
-                    } else {
-                        accs.values[acc_idx] += add;
-                    }
-                }
+        idx_for_zipped! {
+            ((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
+                let add = partial_args
+                    .iter()
+                    .all(|arg| arg.is_valid(partial_arg_idx)) as i64;
+                accs.set_value(acc_idx, Some(accs.value(acc_idx).unwrap_or(0) 
+ add));
             }
         }
         Ok(())
@@ -132,98 +112,25 @@ impl Agg for AggCount {
         merging_accs: &mut AccColumnRef,
         merging_acc_idx: IdxSelection<'_>,
     ) -> Result<()> {
-        let accs = downcast_any!(accs, mut AccCountColumn)?;
-        let merging_accs = downcast_any!(merging_accs, mut AccCountColumn)?;
+        let accs = downcast_any!(accs, mut AccPrimColumn<i64>)?;
+        let merging_accs = downcast_any!(merging_accs, mut 
AccPrimColumn<i64>)?;
         accs.ensure_size(acc_idx);
 
         idx_for_zipped! {
             ((acc_idx, merging_acc_idx) in (acc_idx, merging_acc_idx)) => {
-                if acc_idx < accs.values.len() {
-                    accs.values[acc_idx] += 
merging_accs.values[merging_acc_idx];
-                } else {
-                    accs.values.push(merging_accs.values[merging_acc_idx]);
-                }
+                let v = match (accs.value(acc_idx), 
merging_accs.value(merging_acc_idx)) {
+                    (Some(a), Some(b)) => Some(a + b),
+                    (Some(a), _) => Some(a),
+                    (_, Some(b)) => Some(b),
+                    _ => Some(0),
+                };
+                accs.set_value(acc_idx, v);
             }
         }
         Ok(())
     }
 
     fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) 
-> Result<ArrayRef> {
-        let accs = downcast_any!(accs, mut AccCountColumn)?;
-
-        idx_with_iter! {
-            (acc_idx_iter @ acc_idx) => {
-                Ok(Arc::new(Int64Array::from_iter_values(
-                    acc_idx_iter.map(|idx| accs.values[idx])
-                )))
-            }
-        }
-    }
-}
-
-pub struct AccCountColumn {
-    pub values: Vec<i64>,
-}
-
-impl AccColumn for AccCountColumn {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn as_any_mut(&mut self) -> &mut dyn Any {
-        self
-    }
-
-    fn resize(&mut self, num_accs: usize) {
-        self.values.resize(num_accs, 0);
-    }
-
-    fn shrink_to_fit(&mut self) {
-        self.values.shrink_to_fit();
-    }
-
-    fn num_records(&self) -> usize {
-        self.values.len()
-    }
-
-    fn mem_used(&self) -> usize {
-        self.values.capacity() * 2 * size_of::<i64>()
-    }
-
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        let mut array_idx = 0;
-
-        idx_for! {
-            (idx in idx) => {
-                write_len(self.values[idx] as usize, &mut array[array_idx])?;
-                array_idx += 1;
-            }
-        }
-        Ok(())
-    }
-
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        assert_eq!(self.num_records(), 0, "expect empty AccColumn");
-        for cursor in cursors {
-            self.values.push(read_len(cursor)? as i64);
-        }
-        Ok(())
-    }
-
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
-        idx_for! {
-            (idx in idx) => {
-                write_len(self.values[idx] as usize, w)?;
-            }
-        }
-        Ok(())
-    }
-
-    fn unspill(&mut self, num_rows: usize, r: &mut SpillCompressedReader) -> 
Result<()> {
-        assert_eq!(self.num_records(), 0, "expect empty AccColumn");
-        for _ in 0..num_rows {
-            self.values.push(read_len(r)? as i64);
-        }
-        Ok(())
+        Ok(accs.freeze_to_arrays(acc_idx)?[0].clone())
     }
 }
diff --git a/native-engine/datafusion-ext-plans/src/agg/first.rs 
b/native-engine/datafusion-ext-plans/src/agg/first.rs
index b4bbc115..560cc5d3 100644
--- a/native-engine/datafusion-ext-plans/src/agg/first.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/first.rs
@@ -16,7 +16,6 @@
 use std::{
     any::Any,
     fmt::{Debug, Formatter},
-    io::Cursor,
     sync::Arc,
 };
 
@@ -33,7 +32,7 @@ use crate::{
         Agg,
         acc::{
             AccBooleanColumn, AccBytes, AccBytesColumn, AccColumn, 
AccColumnRef, AccPrimColumn,
-            AccScalarValueColumn, acc_generic_column_to_array, 
create_acc_generic_column,
+            AccScalarValueColumn, create_acc_generic_column,
         },
         agg::IdxSelection,
     },
@@ -43,11 +42,17 @@ use crate::{
 pub struct AggFirst {
     child: PhysicalExprRef,
     data_type: DataType,
+    acc_array_data_types: Vec<DataType>,
 }
 
 impl AggFirst {
     pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> 
Result<Self> {
-        Ok(Self { child, data_type })
+        let acc_array_data_types = vec![data_type.clone(), DataType::Boolean];
+        Ok(Self {
+            child,
+            data_type,
+            acc_array_data_types,
+        })
     }
 }
 
@@ -83,11 +88,15 @@ impl Agg for AggFirst {
 
     fn create_acc_column(&self, num_rows: usize) -> AccColumnRef {
         Box::new(AccFirstColumn {
-            values: create_acc_generic_column(&self.data_type, num_rows),
+            values: create_acc_generic_column(self.data_type.clone(), 
num_rows),
             flags: AccBooleanColumn::new(num_rows),
         })
     }
 
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &self.acc_array_data_types
+    }
+
     fn partial_update(
         &self,
         accs: &mut AccColumnRef,
@@ -267,8 +276,7 @@ impl Agg for AggFirst {
     }
 
     fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) 
-> Result<ArrayRef> {
-        let accs = downcast_any!(accs, mut AccFirstColumn)?;
-        acc_generic_column_to_array(&mut accs.values, &self.data_type, acc_idx)
+        Ok(accs.freeze_to_arrays(acc_idx)?[0].clone())
     }
 }
 
@@ -312,19 +320,19 @@ impl AccColumn for AccFirstColumn {
         self.values.mem_used() + self.flags.mem_used()
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        self.values.freeze_to_rows(idx, array)?;
-        self.flags.freeze_to_rows(idx, array)?;
-        Ok(())
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let value_array = self.values.freeze_to_arrays(idx)?[0].clone();
+        let flags_array = self.flags.freeze_to_arrays(idx)?[0].clone();
+        Ok(vec![value_array, flags_array])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
-        self.values.unfreeze_from_rows(cursors)?;
-        self.flags.unfreeze_from_rows(cursors)?;
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
+        self.values.unfreeze_from_arrays(&arrays[0..1])?;
+        self.flags.unfreeze_from_arrays(&arrays[1..2])?;
         Ok(())
     }
 
-    fn spill(&self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) -> 
Result<()> {
+    fn spill(&mut self, idx: IdxSelection<'_>, w: &mut SpillCompressedWriter) 
-> Result<()> {
         self.values.spill(idx, w)?;
         self.flags.spill(idx, w)?;
         Ok(())
diff --git a/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs 
b/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs
index 03b2ddec..b0c79c0e 100644
--- a/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs
@@ -28,7 +28,7 @@ use crate::{
         Agg,
         acc::{
             AccBooleanColumn, AccBytes, AccBytesColumn, AccColumnRef, 
AccPrimColumn,
-            AccScalarValueColumn, acc_generic_column_to_array, 
create_acc_generic_column,
+            AccScalarValueColumn, create_acc_generic_column,
         },
         agg::IdxSelection,
     },
@@ -38,11 +38,17 @@ use crate::{
 pub struct AggFirstIgnoresNull {
     child: PhysicalExprRef,
     data_type: DataType,
+    acc_array_data_types: Vec<DataType>,
 }
 
 impl AggFirstIgnoresNull {
     pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> 
Result<Self> {
-        Ok(Self { child, data_type })
+        let acc_array_data_types = vec![data_type.clone()];
+        Ok(Self {
+            child,
+            data_type,
+            acc_array_data_types,
+        })
     }
 }
 
@@ -77,7 +83,11 @@ impl Agg for AggFirstIgnoresNull {
     }
 
     fn create_acc_column(&self, num_rows: usize) -> AccColumnRef {
-        create_acc_generic_column(&self.data_type, num_rows)
+        create_acc_generic_column(self.data_type.clone(), num_rows)
+    }
+
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &self.acc_array_data_types
     }
 
     fn partial_update(
@@ -217,6 +227,6 @@ impl Agg for AggFirstIgnoresNull {
     }
 
     fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) 
-> Result<ArrayRef> {
-        acc_generic_column_to_array(accs, &self.data_type, acc_idx)
+        Ok(accs.freeze_to_arrays(acc_idx)?[0].clone())
     }
 }
diff --git a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs 
b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs
index ea726067..c9cb564c 100644
--- a/native-engine/datafusion-ext-plans/src/agg/maxmin.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/maxmin.rs
@@ -30,7 +30,7 @@ use crate::{
         Agg,
         acc::{
             AccBooleanColumn, AccBytes, AccBytesColumn, AccColumn, 
AccColumnRef, AccPrimColumn,
-            AccScalarValueColumn, acc_generic_column_to_array, 
create_acc_generic_column,
+            AccScalarValueColumn, create_acc_generic_column,
         },
         agg::IdxSelection,
     },
@@ -43,14 +43,17 @@ pub type AggMin = AggMaxMin<AggMinParams>;
 pub struct AggMaxMin<P: AggMaxMinParams> {
     child: PhysicalExprRef,
     data_type: DataType,
+    acc_array_data_types: Vec<DataType>,
     _phantom: PhantomData<P>,
 }
 
 impl<P: AggMaxMinParams> AggMaxMin<P> {
     pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> 
Result<Self> {
+        let acc_array_data_types = vec![data_type.clone()];
         Ok(Self {
             child,
             data_type,
+            acc_array_data_types,
             _phantom: Default::default(),
         })
     }
@@ -87,7 +90,11 @@ impl<P: AggMaxMinParams> Agg for AggMaxMin<P> {
     }
 
     fn create_acc_column(&self, num_rows: usize) -> AccColumnRef {
-        create_acc_generic_column(&self.data_type, num_rows)
+        create_acc_generic_column(self.data_type.clone(), num_rows)
+    }
+
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &self.acc_array_data_types
     }
 
     fn partial_update(
@@ -286,7 +293,7 @@ impl<P: AggMaxMinParams> Agg for AggMaxMin<P> {
     }
 
     fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) 
-> Result<ArrayRef> {
-        acc_generic_column_to_array(accs, &self.data_type, acc_idx)
+        Ok(accs.freeze_to_arrays(acc_idx)?[0].clone())
     }
 }
 
diff --git a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs 
b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs
index 15dd82bf..55ecd7cd 100644
--- a/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/spark_udaf_wrapper.rs
@@ -21,7 +21,9 @@ use std::{
 };
 
 use arrow::{
-    array::{Array, ArrayRef, StructArray, as_struct_array, make_array},
+    array::{
+        Array, ArrayRef, BinaryArray, BinaryBuilder, StructArray, 
as_struct_array, make_array,
+    },
     datatypes::{DataType, Field, Schema, SchemaRef},
     ffi::{FFI_ArrowArray, FFI_ArrowSchema, from_ffi},
     record_batch::{RecordBatch, RecordBatchOptions},
@@ -238,6 +240,10 @@ impl Agg for SparkUDAFWrapper {
         Box::new(AccUDAFBufferRowsColumn { obj, jcontext })
     }
 
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &[DataType::Binary]
+    }
+
     fn with_new_exprs(&self, _exprs: Vec<PhysicalExprRef>) -> Result<Arc<dyn 
Agg>> {
         Ok(Arc::new(Self::try_new(
             self.serialized.clone(),
@@ -289,12 +295,11 @@ pub struct AccUDAFBufferRowsColumn {
 }
 
 impl AccUDAFBufferRowsColumn {
-    pub fn freeze_to_rows_with_indices_cache(
+    pub fn freeze_to_array_with_indices_cache(
         &self,
         idx: IdxSelection<'_>,
-        array: &mut [Vec<u8>],
         cache: &OnceCell<LocalRef>,
-    ) -> Result<()> {
+    ) -> Result<ArrayRef> {
         let idx_array =
             cache.get_or_try_init(move || jni_new_prim_array!(int, 
&idx.to_int32_vec()[..]))?;
         let serialized = jni_call!(
@@ -307,15 +312,18 @@ impl AccUDAFBufferRowsColumn {
         jni_get_byte_array_region!(serialized.as_obj(), 0, &mut 
serialized_bytes[..])?;
 
         // UnsafeRow is serialized with big-endian i32 length prefix
-        let mut cursor = Cursor::new(&serialized_bytes);
-        for i in 0..array.len() {
-            let mut bytes_len_buf = [0; 4];
-            cursor.read_exact(&mut bytes_len_buf)?;
+        let mut serialized_pos = 0;
+        let mut binary_builder = BinaryBuilder::with_capacity(idx.len(), 0);
+        for i in 0..idx.len() {
+            let mut bytes_len_buf = [0u8; 4];
+            
bytes_len_buf.copy_from_slice(&serialized_bytes[serialized_pos..][..4]);
             let bytes_len = i32::from_be_bytes(bytes_len_buf) as usize;
-            write_len(bytes_len, &mut array[i])?;
-            std::io::copy(&mut (&mut cursor).take(bytes_len as u64), &mut 
array[i])?;
+            serialized_pos += 4;
+
+            
binary_builder.append_value(&serialized_bytes[serialized_pos..][..bytes_len]);
+            serialized_pos += bytes_len;
         }
-        Ok(())
+        Ok(Arc::new(binary_builder.finish()))
     }
 
     pub fn spill_with_indices_cache(
@@ -389,12 +397,20 @@ impl AccColumn for AccUDAFBufferRowsColumn {
         0 // memory is managed in jvm side
     }
 
-    fn freeze_to_rows(&self, idx: IdxSelection<'_>, array: &mut [Vec<u8>]) -> 
Result<()> {
-        self.freeze_to_rows_with_indices_cache(idx, array, &OnceCell::new())
+    fn freeze_to_arrays(&mut self, idx: IdxSelection<'_>) -> 
Result<Vec<ArrayRef>> {
+        let array = self.freeze_to_array_with_indices_cache(idx, 
&OnceCell::new())?;
+        Ok(vec![array])
     }
 
-    fn unfreeze_from_rows(&mut self, cursors: &mut [Cursor<&[u8]>]) -> 
Result<()> {
+    fn unfreeze_from_arrays(&mut self, arrays: &[ArrayRef]) -> Result<()> {
         assert_eq!(self.num_records(), 0, "expect empty AccColumn");
+        let array = downcast_any!(arrays[0], BinaryArray)?;
+
+        let mut cursors = vec![];
+        for i in 0..array.len() {
+            cursors.push(Cursor::new(array.value(i)));
+        }
+
         let mut data = vec![];
         for cursor in cursors.iter_mut() {
             let bytes_len = read_len(cursor)?;
@@ -414,7 +430,7 @@ impl AccColumn for AccUDAFBufferRowsColumn {
         Ok(())
     }
 
-    fn spill(&self, _idx: IdxSelection<'_>, _buf: &mut SpillCompressedWriter) 
-> Result<()> {
+    fn spill(&mut self, _idx: IdxSelection<'_>, _buf: &mut 
SpillCompressedWriter) -> Result<()> {
         unimplemented!("should call spill_with_indices_cache instead")
     }
 
diff --git a/native-engine/datafusion-ext-plans/src/agg/sum.rs 
b/native-engine/datafusion-ext-plans/src/agg/sum.rs
index 7bae0e11..0b60107a 100644
--- a/native-engine/datafusion-ext-plans/src/agg/sum.rs
+++ b/native-engine/datafusion-ext-plans/src/agg/sum.rs
@@ -26,9 +26,7 @@ use datafusion_ext_commons::{df_unimplemented_err, 
downcast_any};
 use crate::{
     agg::{
         Agg,
-        acc::{
-            AccColumnRef, AccPrimColumn, acc_generic_column_to_array, 
create_acc_generic_column,
-        },
+        acc::{AccColumnRef, AccPrimColumn, create_acc_generic_column},
         agg::IdxSelection,
     },
     idx_for_zipped,
@@ -37,11 +35,17 @@ use crate::{
 pub struct AggSum {
     child: PhysicalExprRef,
     data_type: DataType,
+    acc_array_data_types: Vec<DataType>,
 }
 
 impl AggSum {
     pub fn try_new(child: PhysicalExprRef, data_type: DataType) -> 
Result<Self> {
-        Ok(Self { child, data_type })
+        let acc_array_data_types = vec![data_type.clone()];
+        Ok(Self {
+            child,
+            data_type,
+            acc_array_data_types,
+        })
     }
 }
 
@@ -84,7 +88,11 @@ impl Agg for AggSum {
     }
 
     fn create_acc_column(&self, num_rows: usize) -> AccColumnRef {
-        create_acc_generic_column(&self.data_type, num_rows)
+        create_acc_generic_column(self.data_type.clone(), num_rows)
+    }
+
+    fn acc_array_data_types(&self) -> &[DataType] {
+        &self.acc_array_data_types
     }
 
     fn partial_update(
@@ -145,6 +153,6 @@ impl Agg for AggSum {
     }
 
     fn final_merge(&self, accs: &mut AccColumnRef, acc_idx: IdxSelection<'_>) 
-> Result<ArrayRef> {
-        acc_generic_column_to_array(accs, &self.data_type, acc_idx)
+        Ok(accs.freeze_to_arrays(acc_idx)?[0].clone())
     }
 }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 4f124bd8..de0b6105 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -461,7 +461,7 @@ object AuronConverters extends Logging {
           assert(
             !exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
             s"Parquet scan with timestamp type is not supported for table: 
${tableIdentifier
-              .getOrElse("unknown")}. " +
+                .getOrElse("unknown")}. " +
               "Set spark.auron.enable.scan.parquet.timestamp=true to enable 
timestamp support " +
               "or remove timestamp columns from the query.")
         }
@@ -472,7 +472,7 @@ object AuronConverters extends Logging {
           assert(
             !exec.requiredSchema.exists(e => existTimestampType(e.dataType)),
             s"ORC scan with timestamp type is not supported for 
tableIdentifier: ${tableIdentifier
-              .getOrElse("unknown")}. " +
+                .getOrElse("unknown")}. " +
               "Set spark.auron.enable.scan.orc.timestamp=true to enable 
timestamp support " +
               "or remove timestamp columns from the query.")
         }
@@ -480,7 +480,7 @@ object AuronConverters extends Logging {
       case p =>
         throw new NotImplementedError(
           s"Cannot convert FileSourceScanExec tableIdentifier: 
${tableIdentifier.getOrElse(
-            "unknown")}, class: ${p.getClass.getName}")
+              "unknown")}, class: ${p.getClass.getName}")
     }
   }
 
@@ -836,8 +836,7 @@ object AuronConverters extends Logging {
           addRenameColumnsExec(convertToNative(exec.child))
         case _ =>
           if (needRenameColumns(exec.child)) {
-            val newNames = 
exec.groupingExpressions.map(Util.getFieldNameByExprId) :+
-              NativeAggBase.AGG_BUF_COLUMN_NAME
+            val newNames = 
exec.groupingExpressions.map(Util.getFieldNameByExprId)
             
Shims.get.createNativeRenameColumnsExec(convertToNative(exec.child), newNames)
           } else {
             convertToNative(exec.child)
@@ -893,8 +892,7 @@ object AuronConverters extends Logging {
           addRenameColumnsExec(convertToNative(exec.child))
         case _ =>
           if (needRenameColumns(exec.child)) {
-            val newNames = 
exec.groupingExpressions.map(Util.getFieldNameByExprId) :+
-              NativeAggBase.AGG_BUF_COLUMN_NAME
+            val newNames = 
exec.groupingExpressions.map(Util.getFieldNameByExprId)
             
Shims.get.createNativeRenameColumnsExec(convertToNative(exec.child), newNames)
           } else {
             convertToNative(exec.child)
@@ -947,8 +945,7 @@ object AuronConverters extends Logging {
           addRenameColumnsExec(convertToNative(child))
         case _ =>
           if (needRenameColumns(child)) {
-            val newNames = 
exec.groupingExpressions.map(Util.getFieldNameByExprId) :+
-              NativeAggBase.AGG_BUF_COLUMN_NAME
+            val newNames = 
exec.groupingExpressions.map(Util.getFieldNameByExprId)
             Shims.get.createNativeRenameColumnsExec(convertToNative(child), 
newNames)
           } else {
             convertToNative(child)
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
index 1f8b6421..9944ea5b 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala
@@ -96,7 +96,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) 
extends ColumnarRu
         dumpSimpleSparkPlanTreeNode(sparkPlanTransformed)
 
         logInfo(s"Transformed spark plan after 
preColumnarTransitions:\n${sparkPlanTransformed
-          .treeString(verbose = true, addSuffix = true)}")
+            .treeString(verbose = true, addSuffix = true)}")
 
         // post-transform
         Shims.get.postTransform(sparkPlanTransformed, 
sparkSession.sparkContext)
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
index 3ae7669e..403008e6 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala
@@ -74,7 +74,7 @@ object NativeHelper extends Logging {
     val heapMemory = Runtime.getRuntime.maxMemory()
     val offheapMemory = totalMemory - heapMemory
     logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: 
${Utils.bytesToString(
-      heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}")
+        heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}")
     offheapMemory
   }
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala
index 8e5d7353..eecec5ef 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala
@@ -47,7 +47,7 @@ object TaskContextHelper extends Logging {
     val thread = Thread.currentThread()
     val threadName = if (context != null) {
       s"auron native task ${context.partitionId()}.${context.attemptNumber()} 
in stage ${context
-        .stageId()}.${context.stageAttemptNumber()} (TID 
${context.taskAttemptId()})"
+          .stageId()}.${context.stageAttemptNumber()} (TID 
${context.taskAttemptId()})"
     } else {
       "auron native task " + thread.getName
     }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
index 54e2cddc..292368b9 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggBase.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.auron.plan
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.immutable.SortedMap
-
 import org.apache.spark.OneToOneDependency
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.auron.NativeConverters
@@ -52,9 +51,7 @@ import 
org.apache.spark.sql.execution.auron.plan.NativeAggBase.SortAgg
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.BinaryType
-import org.apache.spark.sql.types.DataType
-
+import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, LongType}
 import org.apache.auron.{protobuf => pb}
 import org.apache.auron.jni.AuronAdaptor
 import org.apache.auron.metric.SparkMetricNode
@@ -146,9 +143,10 @@ abstract class NativeAggBase(
     if (nativeAggrModes.contains(pb.AggMode.FINAL)) {
       groupingExpressions.map(_.toAttribute) ++ aggregateAttributes
     } else {
-      groupingExpressions.map(_.toAttribute) :+
-        AttributeReference(NativeAggBase.AGG_BUF_COLUMN_NAME, BinaryType, 
nullable = false)(
-          ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID))
+      val aggBufferAttrs = nativeAggrInfos
+        .flatMap(_.aggBufferDataTypes)
+        .map(AttributeReference("", _, nullable = false)(ExprId.apply(0)))
+      groupingExpressions.map(_.toAttribute) :+ aggBufferAttrs
     }
 
   override def outputPartitioning: Partitioning =
@@ -217,30 +215,14 @@ object NativeAggBase extends Logging {
   case object HashAgg extends AggExecMode
   case object SortAgg extends AggExecMode
 
-  case class NativeAggrPartialState(stateAttr: Attribute, arrowType: 
pb.ArrowType)
-
-  object NativeAggrPartialState {
-    def apply(
-        aggrAttr: Attribute,
-        stateFieldName: String,
-        dataType: DataType,
-        nullable: Boolean,
-        arrowType: pb.ArrowType = null): NativeAggrPartialState = {
-
-      val fieldName = 
s"${Util.getFieldNameByExprId(aggrAttr)}[$stateFieldName]"
-      val stateAttr = AttributeReference(fieldName, dataType, 
nullable)(aggrAttr.exprId)
-      NativeAggrPartialState(
-        stateAttr,
-        arrowType = 
Option(arrowType).getOrElse(NativeConverters.convertDataType(dataType)))
-    }
-  }
-
   case class NativeAggrInfo(
       mode: AggregateMode,
       nativeAggrs: Seq[pb.PhysicalExprNode],
+      aggBufferDataTypes: Seq[DataType],
       outputAttr: Attribute)
 
   def getNativeAggrInfo(aggr: AggregateExpression, aggrAttr: Attribute): 
NativeAggrInfo = {
+    val aggBufferDataTypes = computeNativeAggBufferDataTypes(aggr)
     val reducedAggr = AggregateExpression(
       aggr.aggregateFunction
         .mapChildren(e => createPlaceholder(e))
@@ -253,12 +235,13 @@ object NativeAggBase extends Logging {
 
     aggr.mode match {
       case Partial =>
-        NativeAggrInfo(aggr.mode, NativeConverters.convertAggregateExpr(aggr) 
:: Nil, outputAttr)
+        NativeAggrInfo(aggr.mode, NativeConverters.convertAggregateExpr(aggr) 
:: Nil, aggBufferDataTypes, outputAttr)
 
       case PartialMerge | Final =>
         NativeAggrInfo(
           aggr.mode,
           NativeConverters.convertAggregateExpr(reducedAggr) :: Nil,
+          aggBufferDataTypes,
           outputAttr)
 
       case Complete =>
@@ -310,4 +293,17 @@ object NativeAggBase extends Logging {
     }
     findRecursive(exec.children.head)
   }
+
+  def computeNativeAggBufferDataTypes(aggr: AggregateFunction): Seq[DataType] 
= {
+    aggr match {
+      case _: Count => Seq(LongType)
+      case f: Max  => Seq(f.dataType)
+      case f: Min => Seq(f.dataType)
+      case f: Sum => Seq(f.dataType)
+      case f: Average => Seq(f.dataType, LongType)
+      case f@First(_, true) => Seq(f.dataType)
+      case f@First(_, false) => Seq(f.dataType, BooleanType)
+      case _ => Seq(BinaryType)
+    }
+  }
 }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala
index d43f7d17..b9fb7f99 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala
@@ -69,9 +69,10 @@ abstract class NativeParquetInsertIntoHiveTableBase(
         .filterKeys(Set("stage_id", "output_rows", "elapsed_compute"))
         .toSeq
         :+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, 
"Native.io_time"))
-        :+ ("bytes_written",
-        SQLMetrics
-          .createSizeMetric(sparkContext, "Native.bytes_written")): _*)
+        :+ (
+          "bytes_written",
+          SQLMetrics
+            .createSizeMetric(sparkContext, "Native.bytes_written")): _*)
 
   def check(): Unit = {
     val hadoopConf = sparkContext.hadoopConfiguration

Reply via email to