This is an automated email from the ASF dual-hosted git repository.

dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d57136cf3cf5 [SPARK-54673][SQL] Refactor SQL pipe syntax analysis code 
for sharing and reuse
d57136cf3cf5 is described below

commit d57136cf3cf5725e17e6202cc8a91a5cdda170cc
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Thu Dec 11 11:27:41 2025 -0800

    [SPARK-54673][SQL] Refactor SQL pipe syntax analysis code for sharing and 
reuse
    
    ### What changes were proposed in this pull request?
    
    This PR adds a small refactor of the SQL pipe syntax analysis code for 
sharing and reuse.
    
    ### Why are the changes needed?
    
    The new analyzer can refer to the refactored code without duplicating it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    N/A, this is a small refactoring only.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53431 from dtenedor/refactor-pipe-syntax.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../sql/catalyst/expressions/pipeOperators.scala   | 52 +++++++++++++++-------
 1 file changed, 35 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
index b2bb949c9e5e..cfbd403d66fd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/pipeOperators.scala
@@ -71,26 +71,44 @@ case object ValidateAndStripPipeExpressions extends 
Rule[LogicalPlan] {
     case node: LogicalPlan =>
       node.resolveExpressions {
         case p: PipeExpression if p.child.resolved =>
-          // Once the child expression is resolved, we can perform the 
necessary invariant checks
-          // and then remove this expression, replacing it with the child 
expression instead.
-          val firstAggregateFunction: Option[AggregateFunction] = 
findFirstAggregate(p.child)
-          if (p.isAggregate && firstAggregateFunction.isEmpty) {
-            throw QueryCompilationErrors
-              
.pipeOperatorAggregateExpressionContainsNoAggregateFunction(p.child)
-          } else if (!p.isAggregate) {
-            // For non-aggregate clauses, only allow aggregate functions in 
SELECT.
-            // All other clauses (EXTEND, SET, etc.) disallow aggregates.
-            val aggregateAllowed = p.clause == PipeOperators.selectClause
-            if (!aggregateAllowed) {
-              firstAggregateFunction.foreach { a =>
-                throw 
QueryCompilationErrors.pipeOperatorContainsAggregateFunction(a, p.clause)
-              }
-            }
-          }
-          p.child
+          validateAndStripPipeExpression(p, p.child)
       }
   }
 
+  /**
+   * Validates aggregate function constraints for a [[PipeExpression]] and 
returns the resolved
+   * child expression (stripping the [[PipeExpression]] wrapper).
+   *
+   * This method is shared between the fixed-point analyzer rule and the 
single-pass resolver.
+   *
+   * @param pipeExpression The [[PipeExpression]] containing metadata about 
the pipe clause.
+   * @param resolvedChild The resolved child expression to validate and return.
+   * @return The resolved child expression after validation.
+   */
+  def validateAndStripPipeExpression(
+      pipeExpression: PipeExpression,
+      resolvedChild: Expression): Expression = {
+    val firstAggregateFunction: Option[AggregateFunction] = 
findFirstAggregate(resolvedChild)
+    if (pipeExpression.isAggregate && firstAggregateFunction.isEmpty) {
+      throw QueryCompilationErrors
+          
.pipeOperatorAggregateExpressionContainsNoAggregateFunction(resolvedChild)
+    }
+    if (!pipeExpression.isAggregate) {
+      // For non-aggregate clauses, only allow aggregate functions in SELECT.
+      // All other clauses (EXTEND, SET, etc.) disallow aggregates.
+      val aggregateAllowed = pipeExpression.clause == 
PipeOperators.selectClause
+      if (!aggregateAllowed) {
+        firstAggregateFunction.foreach { a =>
+          throw QueryCompilationErrors.pipeOperatorContainsAggregateFunction(
+            a,
+            pipeExpression.clause
+          )
+        }
+      }
+    }
+    resolvedChild
+  }
+
   /** Returns the first aggregate function in the given expression, or None if 
not found. */
   private def findFirstAggregate(e: Expression): Option[AggregateFunction] = e 
match {
     case a: AggregateFunction =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to