This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new acb78280 perf: Remove some redundant copying of batches (#816)
acb78280 is described below
commit acb78280e495079312a1067b80e91671852389a5
Author: Andy Grove <[email protected]>
AuthorDate: Tue Aug 13 13:21:13 2024 -0600
perf: Remove some redundant copying of batches (#816)
---
native/core/src/execution/datafusion/planner.rs | 27 +++++---
native/core/src/execution/operators/copy.rs | 88 ++++++++++++++++++++++---
native/core/src/execution/operators/mod.rs | 62 +----------------
3 files changed, 100 insertions(+), 77 deletions(-)
diff --git a/native/core/src/execution/datafusion/planner.rs
b/native/core/src/execution/datafusion/planner.rs
index c3aad61e..122a24ed 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -19,6 +19,7 @@
use super::expressions::EvalMode;
use
crate::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun;
+use crate::execution::operators::CopyMode;
use crate::{
errors::ExpressionError,
execution::{
@@ -859,7 +860,11 @@ impl PhysicalPlanner {
let fetch = sort.fetch.map(|num| num as usize);
- let copy_exec = Arc::new(CopyExec::new(child));
+ let copy_exec = if can_reuse_input_batch(&child) {
+ Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy))
+ } else {
+ Arc::new(CopyExec::new(child, CopyMode::UnpackOrClone))
+ };
Ok((
scans,
@@ -949,8 +954,8 @@ impl PhysicalPlanner {
// the data corruption. Note that we only need to copy the
input batch
// if the child operator is `ScanExec`, because other
operators after `ScanExec`
// will create new arrays for the output batch.
- let child = if child.as_any().is::<ScanExec>() {
- Arc::new(CopyExec::new(child))
+ let child = if can_reuse_input_batch(&child) {
+ Arc::new(CopyExec::new(child, CopyMode::UnpackOrDeepCopy))
} else {
child
};
@@ -1205,15 +1210,15 @@ impl PhysicalPlanner {
// to copy the input batch to avoid the data corruption from reusing
the input
// batch.
let left = if can_reuse_input_batch(&left) {
- Arc::new(CopyExec::new(left))
+ Arc::new(CopyExec::new(left, CopyMode::UnpackOrDeepCopy))
} else {
- left
+ Arc::new(CopyExec::new(left, CopyMode::UnpackOrClone))
};
let right = if can_reuse_input_batch(&right) {
- Arc::new(CopyExec::new(right))
+ Arc::new(CopyExec::new(right, CopyMode::UnpackOrDeepCopy))
} else {
- right
+ Arc::new(CopyExec::new(right, CopyMode::UnpackOrClone))
};
Ok((
@@ -1775,10 +1780,14 @@ impl From<ExpressionError> for DataFusionError {
/// modification. This is used to determine if we need to copy the input batch
to avoid
/// data corruption from reusing the input batch.
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
- op.as_any().is::<ScanExec>()
+ if op.as_any().is::<ProjectionExec>()
|| op.as_any().is::<LocalLimitExec>()
- || op.as_any().is::<ProjectionExec>()
|| op.as_any().is::<FilterExec>()
+ {
+ can_reuse_input_batch(op.children()[0])
+ } else {
+ op.as_any().is::<ScanExec>()
+ }
}
/// Collects the indices of the columns in the input schema that are used in
the expression
diff --git a/native/core/src/execution/operators/copy.rs
b/native/core/src/execution/operators/copy.rs
index b5b1491e..0705a3b7 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::compute::{cast_with_options, CastOptions};
+use futures::{Stream, StreamExt};
use std::{
any::Any,
pin::Pin,
@@ -22,17 +24,16 @@ use std::{
task::{Context, Poll},
};
-use futures::{Stream, StreamExt};
-
-use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions};
-use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef};
+use arrow_array::{
+ downcast_dictionary_array, make_array, Array, ArrayRef, RecordBatch,
RecordBatchOptions,
+};
+use arrow_data::transform::MutableArrayData;
+use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaRef};
use datafusion::physical_plan::metrics::{BaselineMetrics,
ExecutionPlanMetricsSet, MetricsSet};
use datafusion::{execution::TaskContext, physical_expr::*, physical_plan::*};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as
DataFusionResult};
-use super::copy_or_cast_array;
-
/// An utility execution node which makes deep copies of input batches.
///
/// In certain scenarios like sort, DF execution nodes only make shallow copy
of input batches.
@@ -44,10 +45,20 @@ pub struct CopyExec {
schema: SchemaRef,
cache: PlanProperties,
metrics: ExecutionPlanMetricsSet,
+ mode: CopyMode,
+}
+
+#[derive(Debug, PartialEq, Clone)]
+pub enum CopyMode {
+ UnpackOrDeepCopy,
+ UnpackOrClone,
}
impl CopyExec {
- pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
+ pub fn new(input: Arc<dyn ExecutionPlan>, mode: CopyMode) -> Self {
+ // change schema to remove dictionary types because CopyExec always
unpacks
+ // dictionaries
+
let fields: Vec<Field> = input
.schema()
.fields
@@ -73,6 +84,7 @@ impl CopyExec {
schema,
cache,
metrics: ExecutionPlanMetricsSet::default(),
+ mode,
}
}
}
@@ -81,7 +93,7 @@ impl DisplayAs for CopyExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) ->
std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(f, "CopyExec")
+ write!(f, "CopyExec [{:?}]", self.mode)
}
}
}
@@ -111,6 +123,7 @@ impl ExecutionPlan for CopyExec {
schema: self.schema.clone(),
cache: self.cache.clone(),
metrics: self.metrics.clone(),
+ mode: self.mode.clone(),
}))
}
@@ -125,6 +138,7 @@ impl ExecutionPlan for CopyExec {
self.schema(),
child_stream,
partition,
+ self.mode.clone(),
)))
}
@@ -149,6 +163,7 @@ struct CopyStream {
schema: SchemaRef,
child_stream: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
+ mode: CopyMode,
}
impl CopyStream {
@@ -157,11 +172,13 @@ impl CopyStream {
schema: SchemaRef,
child_stream: SendableRecordBatchStream,
partition: usize,
+ mode: CopyMode,
) -> Self {
Self {
schema,
child_stream,
baseline_metrics: BaselineMetrics::new(&exec.metrics, partition),
+ mode,
}
}
@@ -172,7 +189,7 @@ impl CopyStream {
let vectors = batch
.columns()
.iter()
- .map(|v| copy_or_cast_array(v))
+ .map(|v| copy_or_unpack_array(v, &self.mode))
.collect::<Result<Vec<ArrayRef>, _>>()?;
let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
@@ -200,3 +217,56 @@ impl RecordBatchStream for CopyStream {
self.schema.clone()
}
}
+
+/// Copy an Arrow Array
+fn copy_array(array: &dyn Array) -> ArrayRef {
+ let capacity = array.len();
+ let data = array.to_data();
+
+ let mut mutable = MutableArrayData::new(vec![&data], false, capacity);
+
+ mutable.extend(0, 0, capacity);
+
+ if matches!(array.data_type(), DataType::Dictionary(_, _)) {
+ let copied_dict = make_array(mutable.freeze());
+ let ref_copied_dict = &copied_dict;
+
+ downcast_dictionary_array!(
+ ref_copied_dict => {
+ // Copying dictionary value array
+ let values = ref_copied_dict.values();
+ let data = values.to_data();
+
+ let mut mutable = MutableArrayData::new(vec![&data], false,
values.len());
+ mutable.extend(0, 0, values.len());
+
+ let copied_dict =
ref_copied_dict.with_values(make_array(mutable.freeze()));
+ Arc::new(copied_dict)
+ }
+ t => unreachable!("Should not reach here: {}", t)
+ )
+ } else {
+ make_array(mutable.freeze())
+ }
+}
+
+/// Copy an Arrow Array or cast to primitive type if it is a dictionary array.
+/// This is used for `CopyExec` to copy/cast the input array. If the input
array
+/// is a dictionary array, we will cast the dictionary array to primitive type
+/// (i.e., unpack the dictionary array) and copy the primitive array. If the
input
+/// array is a primitive array, we simply copy the array.
+fn copy_or_unpack_array(array: &Arc<dyn Array>, mode: &CopyMode) ->
Result<ArrayRef, ArrowError> {
+ match array.data_type() {
+ DataType::Dictionary(_, value_type) => {
+ let options = CastOptions::default();
+ cast_with_options(array, value_type.as_ref(), &options)
+ }
+ _ => {
+ if mode == &CopyMode::UnpackOrDeepCopy {
+ Ok(copy_array(array))
+ } else {
+ Ok(Arc::clone(array))
+ }
+ }
+ }
+}
diff --git a/native/core/src/execution/operators/mod.rs
b/native/core/src/execution/operators/mod.rs
index d0cc7ac6..09e05ef2 100644
--- a/native/core/src/execution/operators/mod.rs
+++ b/native/core/src/execution/operators/mod.rs
@@ -17,22 +17,15 @@
//! Operators
-use arrow::{
- array::{make_array, Array, ArrayRef, MutableArrayData},
- datatypes::DataType,
- downcast_dictionary_array,
-};
+use std::fmt::Debug;
-use arrow::compute::{cast_with_options, CastOptions};
-use arrow_schema::ArrowError;
use jni::objects::GlobalRef;
-use std::{fmt::Debug, sync::Arc};
-mod scan;
+pub use copy::*;
pub use scan::*;
mod copy;
-pub use copy::*;
+mod scan;
/// Error returned during executing operators.
#[derive(thiserror::Error, Debug)]
@@ -61,52 +54,3 @@ pub enum ExecutionError {
throwable: GlobalRef,
},
}
-
-/// Copy an Arrow Array
-pub fn copy_array(array: &dyn Array) -> ArrayRef {
- let capacity = array.len();
- let data = array.to_data();
-
- let mut mutable = MutableArrayData::new(vec![&data], false, capacity);
-
- mutable.extend(0, 0, capacity);
-
- if matches!(array.data_type(), DataType::Dictionary(_, _)) {
- let copied_dict = make_array(mutable.freeze());
- let ref_copied_dict = &copied_dict;
-
- downcast_dictionary_array!(
- ref_copied_dict => {
- // Copying dictionary value array
- let values = ref_copied_dict.values();
- let data = values.to_data();
-
- let mut mutable = MutableArrayData::new(vec![&data], false,
values.len());
- mutable.extend(0, 0, values.len());
-
- let copied_dict =
ref_copied_dict.with_values(make_array(mutable.freeze()));
- Arc::new(copied_dict)
- }
- t => unreachable!("Should not reach here: {}", t)
- )
- } else {
- make_array(mutable.freeze())
- }
-}
-
-/// Copy an Arrow Array or cast to primitive type if it is a dictionary array.
-/// This is used for `CopyExec` to copy/cast the input array. If the input
array
-/// is a dictionary array, we will cast the dictionary array to primitive type
-/// (i.e., unpack the dictionary array) and copy the primitive array. If the
input
-/// array is a primitive array, we simply copy the array.
-pub fn copy_or_cast_array(array: &dyn Array) -> Result<ArrayRef, ArrowError> {
- match array.data_type() {
- DataType::Dictionary(_, value_type) => {
- let options = CastOptions::default();
- let casted = cast_with_options(array, value_type.as_ref(),
&options);
-
- casted.and_then(|a| copy_or_cast_array(a.as_ref()))
- }
- _ => Ok(copy_array(array)),
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]