gstvg commented on code in PR #22853:
URL: https://github.com/apache/datafusion/pull/22853#discussion_r3471937812


##########
datafusion/expr/src/higher_order_function.rs:
##########
@@ -257,26 +266,64 @@ pub struct LambdaArgument {
 }
 
 impl LambdaArgument {
+    /// Build a [`LambdaArgument`] that treats every declared parameter as
+    /// used. This is the backwards-compatible behavior. Prefer
+    /// [`Self::new_with_used_params`] when the caller knows which subset of
+    /// the lambda's parameters the body actually references — otherwise the
+    /// merged batch will still contain columns for unused parameters.
     pub fn new(
         params: Vec<FieldRef>,
         body: Arc<dyn PhysicalExpr>,
         captures: Option<RecordBatch>,
     ) -> Self {
+        Self::new_with_used_params(params, body, captures, None)
+    }
+
+    /// Build a [`LambdaArgument`] knowing which subset of `params` (by name)
+    /// the lambda body actually references.
+    ///
+    /// When `used_params` is `Some(set)`, [`Self::evaluate`] only evaluates
+    /// and pushes the closures whose corresponding parameter name is in
+    /// `set`, in the original declaration order of `params`. Unused declared
+    /// parameters leave no slot in the merged batch, so the body's compressed
+    /// column indices line up directly. When `used_params` is `None`,
+    /// behavior is identical to [`Self::new`].
+    pub fn new_with_used_params(
+        params: Vec<FieldRef>,
+        body: Arc<dyn PhysicalExpr>,
+        captures: Option<RecordBatch>,
+        used_params: Option<HashSet<String>>,
+    ) -> Self {
+        let used_param_indices = used_params.map(|set| {
+            params
+                .iter()
+                .enumerate()
+                .filter(|(_, f)| set.contains(f.name()))
+                .map(|(i, _)| i)
+                .collect::<Vec<_>>()
+        });
+
+        let effective_params: Vec<FieldRef> = match &used_param_indices {
+            Some(indices) => indices.iter().map(|i| 
Arc::clone(&params[*i])).collect(),
+            None => params.clone(),
+        };
+
         let fields = match &captures {
             Some(batch) => batch
                 .schema_ref()
                 .fields()
                 .iter()
                 .cloned()
-                .chain(params.clone())
+                .chain(effective_params.iter().cloned())

Review Comment:
   nit: can't we use `.chain(effective_params)` directly?



##########
datafusion/physical-expr/src/expressions/lambda.rs:
##########
@@ -149,6 +161,66 @@ impl LambdaExpr {
     pub(crate) fn projected_body(&self) -> &Arc<dyn PhysicalExpr> {
         &self.projected_body
     }
+
+    /// Subset of [`params`](Self::params) (by name) that the body actually
+    /// references, taking nested-lambda shadowing into account. Used by the
+    /// higher-order function evaluator to skip evaluating/pushing parameters
+    /// the lambda body does not need, so that unused declared parameters do
+    /// not shift the merged batch's column positions out of sync with the
+    /// body's compressed indices.
+    pub fn used_params(&self) -> &HashSet<String> {
+        &self.used_params
+    }
+}
+
+/// Walk the body collecting:
+///
+/// * `used_indices` — every `Column` / `LambdaVariable` index referenced
+///   anywhere in the tree (including inside nested lambdas). This drives the
+///   `projection` used to slice the outer batch.
+/// * `used_param_names` — the subset of *this* lambda's `own_params` that the
+///   body actually references, with nested-lambda parameters shadowing the
+///   outer ones. For example, in `(k, v) -> func(col, (k, v2) -> k + v2 + v)`
+///   the inner `k` shadows the outer `k`, so only `v` flows up as used.
+fn collect_used(
+    node: &Arc<dyn PhysicalExpr>,
+    own_params: &HashSet<String>,
+    used_indices: &mut HashSet<usize>,
+    used_param_names: &mut HashSet<String>,
+    shadow_stack: &mut Vec<HashSet<String>>,
+) {
+    if let Some(col) = node.downcast_ref::<Column>() {
+        used_indices.insert(col.index());
+    } else if let Some(var) = node.downcast_ref::<LambdaVariable>() {
+        used_indices.insert(var.index());
+
+        let name = var.name();
+        let shadowed = shadow_stack.iter().any(|frame| frame.contains(name));
+        if !shadowed && own_params.contains(name) {
+            used_param_names.insert(name.to_string());
+        }
+    }
+
+    let pushed = if let Some(nested) = node.downcast_ref::<LambdaExpr>() {
+        shadow_stack.push(nested.params.iter().cloned().collect());
+        true
+    } else {
+        false
+    };
+
+    for child in node.children() {
+        collect_used(
+            child,
+            own_params,
+            used_indices,
+            used_param_names,
+            shadow_stack,
+        );
+    }
+
+    if pushed {
+        shadow_stack.pop();
+    }
 }

Review Comment:
   style, non-blocking: these traversals can use `TreeNodeVisitor` instead of 
hand-rolled recursion — its `f_down`/`f_up` map directly to the push/pop this 
shadow stack needs on entering/leaving a nested lambda. IMHO easier to 
read/reason about since it's widely used across the codebase alongside its 
rewriting counterpart `TreeNodeRewriter`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to