This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new c429beabf fix: Improve logic for determining when an UnpackOrDeepCopy
is needed (#2142)
c429beabf is described below
commit c429beabf082b48a8635390b5dc666338a7c074d
Author: Andy Grove <[email protected]>
AuthorDate: Wed Aug 13 15:52:30 2025 -0600
fix: Improve logic for determining when an UnpackOrDeepCopy is needed
(#2142)
---
native/core/src/execution/operators/copy.rs | 8 ++++++++
native/core/src/execution/planner.rs | 19 ++++++++++++++++---
2 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/native/core/src/execution/operators/copy.rs
b/native/core/src/execution/operators/copy.rs
index a809031d5..f1e87c2e0 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -91,6 +91,14 @@ impl CopyExec {
mode,
}
}
+
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ pub fn mode(&self) -> &CopyMode {
+ &self.mode
+ }
}
impl DisplayAs for CopyExec {
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 600c1ef2e..8533dae21 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -2610,10 +2610,23 @@ impl From<ExpressionError> for DataFusionError {
/// modification. This is used to determine if we need to copy the input batch
to avoid
/// data corruption from reusing the input batch.
fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
- if op.as_any().is::<ProjectionExec>() ||
op.as_any().is::<LocalLimitExec>() {
- can_reuse_input_batch(op.children()[0])
+ if op.as_any().is::<ScanExec>() {
+ // JVM side can return arrow buffers to the pool
+ // Also, native_comet scan reuses mutable buffers
+ true
+ } else if op.as_any().is::<CopyExec>() {
+ let copy_exec = op.as_any().downcast_ref::<CopyExec>().unwrap();
+ copy_exec.mode() == &CopyMode::UnpackOrClone &&
can_reuse_input_batch(copy_exec.input())
+ } else if op.as_any().is::<CometFilterExec>() {
+ // CometFilterExec guarantees that all arrays have been copied
+ false
} else {
- op.as_any().is::<ScanExec>()
+ for child in op.children() {
+ if can_reuse_input_batch(child) {
+ return true;
+ }
+ }
+ false
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]