This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new c429beabf fix: Improve logic for determining when an UnpackOrDeepCopy 
is needed (#2142)
c429beabf is described below

commit c429beabf082b48a8635390b5dc666338a7c074d
Author: Andy Grove <[email protected]>
AuthorDate: Wed Aug 13 15:52:30 2025 -0600

    fix: Improve logic for determining when an UnpackOrDeepCopy is needed 
(#2142)
---
 native/core/src/execution/operators/copy.rs |  8 ++++++++
 native/core/src/execution/planner.rs        | 19 ++++++++++++++++---
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/native/core/src/execution/operators/copy.rs 
b/native/core/src/execution/operators/copy.rs
index a809031d5..f1e87c2e0 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -91,6 +91,14 @@ impl CopyExec {
             mode,
         }
     }
+
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    pub fn mode(&self) -> &CopyMode {
+        &self.mode
+    }
 }
 
 impl DisplayAs for CopyExec {
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 600c1ef2e..8533dae21 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -2610,10 +2610,23 @@ impl From<ExpressionError> for DataFusionError {
 /// modification. This is used to determine if we need to copy the input batch 
to avoid
 /// data corruption from reusing the input batch.
 fn can_reuse_input_batch(op: &Arc<dyn ExecutionPlan>) -> bool {
-    if op.as_any().is::<ProjectionExec>() || 
op.as_any().is::<LocalLimitExec>() {
-        can_reuse_input_batch(op.children()[0])
+    if op.as_any().is::<ScanExec>() {
+        // JVM side can return arrow buffers to the pool
+        // Also, native_comet scan reuses mutable buffers
+        true
+    } else if op.as_any().is::<CopyExec>() {
+        let copy_exec = op.as_any().downcast_ref::<CopyExec>().unwrap();
+        copy_exec.mode() == &CopyMode::UnpackOrClone && 
can_reuse_input_batch(copy_exec.input())
+    } else if op.as_any().is::<CometFilterExec>() {
+        // CometFilterExec guarantees that all arrays have been copied
+        false
     } else {
-        op.as_any().is::<ScanExec>()
+        for child in op.children() {
+            if can_reuse_input_batch(child) {
+                return true;
+            }
+        }
+        false
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to