This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b08ac303faed [SPARK-53521] Refactor Star expression
b08ac303faed is described below

commit b08ac303faed596efca499136f68884064c62ee2
Author: Mikhail Nikoliukin <mikhail.nikoliu...@databricks.com>
AuthorDate: Tue Sep 9 11:33:05 2025 +0800

    [SPARK-53521] Refactor Star expression
    
    ### What changes were proposed in this pull request?
    
    Small refactor of the Star trait to make it compatible with the new 
single-pass Analyzer. Basically, remove `LogicalPlan` from the signature of 
core methods. This makes it possible to call them using `NameScope`.
    
    ### Why are the changes needed?
    
    To eventually support all types of star expressions in the single pass 
analyzer
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code v1.0.107
    
    Closes #52268 from mikhailnik-db/refactor-star-trait.
    
    Authored-by: Mikhail Nikoliukin <mikhail.nikoliu...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   2 +-
 .../sql/catalyst/analysis/resolver/NameScope.scala |  23 +--
 .../spark/sql/catalyst/analysis/unresolved.scala   | 158 +++++++++++++--------
 3 files changed, 109 insertions(+), 74 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1896a1c7ac27..69731cd9576f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1945,7 +1945,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
               f1.nameParts == Seq("count") &&
               f1.arguments.length == 1) {
             f1.arguments.foreach {
-              case u: UnresolvedStar if u.isQualifiedByTable(child, resolver) 
=>
+              case u: UnresolvedStar if u.isQualifiedByTable(child.output, 
resolver) =>
                 throw QueryCompilationErrors
                   
.singleTableStarInCountNotAllowedError(u.target.get.mkString("."))
               case _ => // do nothing
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala
index 3ccae116cb18..ed731432e2a3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala
@@ -24,9 +24,10 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis.{
+  ExpandStarParameters,
   LiteralFunctionResolution,
   Resolver => NameComparator,
-  UnresolvedStar
+  Star
 }
 import org.apache.spark.sql.catalyst.expressions.{
   Alias,
@@ -323,7 +324,7 @@ class NameScope(
   }
 
   /**
-   * Expand the [[UnresolvedStar]]. The expected use case for this method is 
star expansion inside
+   * Expand the [[Star]]. The expected use case for this method is star 
expansion inside
    * [[Project]].
    *
    * Star without a target:
@@ -360,16 +361,18 @@ class NameScope(
    * SELECT concat_ws('', *) AS result FROM VALUES (1, 2, 3);
    * }}}
    *
-   * Also, see [[UnresolvedStarBase.expandStar]] for more details.
+   * Also, see [[Star.expandStar]] for more details.
    */
-  def expandStar(unresolvedStar: UnresolvedStar): Seq[NamedExpression] = {
+  def expandStar(unresolvedStar: Star): Seq[NamedExpression] = {
     unresolvedStar.expandStar(
-      childOperatorOutput = output,
-      childOperatorMetadataOutput = hiddenOutput,
-      resolve = (nameParts, comparator) => 
resolveNameInStarExpansion(nameParts, comparator),
-      suggestedAttributes = output,
-      resolver = nameComparator,
-      cleanupNestedAliasesDuringStructExpansion = true
+      ExpandStarParameters(
+        childOperatorOutput = output,
+        childOperatorMetadataOutput = hiddenOutput,
+        resolve = (nameParts, comparator) => 
resolveNameInStarExpansion(nameParts, comparator),
+        suggestedAttributes = output,
+        resolver = nameComparator,
+        cleanupNestedAliasesDuringStructExpansion = true
+      )
     )
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 253597574d9f..2e0dbe9b7f42 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -425,6 +425,26 @@ object UnresolvedFunction {
   }
 }
 
+/**
+ * Encapsulates the arguments needed for [[Star.expandStar]].
+ *
+ * @param childOperatorOutput The output attributes of the child operator
+ * @param childOperatorMetadataOutput The metadata output attributes of the 
child operator
+ * @param resolve A function to resolve the given name parts to an attribute
+ * @param suggestedAttributes A list of attributes that are suggested for 
expansion
+ * @param resolver The resolver used to match the name parts
+ * @param cleanupNestedAliasesDuringStructExpansion Whether to cleanup nested 
aliases during
+ *                                                  struct expansion
+ */
+case class ExpandStarParameters(
+    childOperatorOutput: Seq[Attribute],
+    childOperatorMetadataOutput: Seq[Attribute],
+    resolve: (Seq[String], Resolver) => Option[NamedExpression],
+    suggestedAttributes: Seq[Attribute],
+    resolver: Resolver,
+    cleanupNestedAliasesDuringStructExpansion: Boolean = false
+)
+
 /**
  * Represents all of the input attributes to a given relational operator, for 
example in
  * "SELECT * FROM ...". A [[Star]] gets automatically expanded during analysis.
@@ -440,7 +460,34 @@ trait Star extends NamedExpression {
   override def newInstance(): NamedExpression = throw new 
UnresolvedException("newInstance")
   override lazy val resolved = false
 
-  def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression]
+  /**
+   *
+   * Expand the * as either:
+   *  1. all the columns in the output of the input logical plan
+   *  2. struct expansion - returning the fields of the target struct as 
top-level columns
+   *     e.g. SELECT x.* => fields of struct x
+   *
+   * It uses output and metadata output attributes of the child
+   * for the expansion, and it supports both recursive and non-recursive data 
types.
+   *
+   * @param parameters The arguments needed for star expansion
+   */
+  def expandStar(parameters: ExpandStarParameters): Seq[NamedExpression]
+
+  /**
+   * Entry point for fixed point analyzer.
+   */
+  final def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = {
+    expandStar(
+      ExpandStarParameters(
+        childOperatorOutput = input.output,
+        childOperatorMetadataOutput = input.metadataOutput,
+        resolve = input.resolve,
+        suggestedAttributes = input.inputSet.toSeq,
+        resolver = resolver
+      )
+    )
+  }
 }
 
 /**
@@ -488,58 +535,36 @@ trait UnresolvedStarBase extends Star with Unevaluable {
     nameParts.corresponds(qualifierList)(resolver)
   }
 
-  def isQualifiedByTable(input: LogicalPlan, resolver: Resolver): Boolean = {
-    target.exists(nameParts => input.output.exists(matchedQualifier(_, 
nameParts, resolver)))
-  }
-
-  override def expand(
-      input: LogicalPlan,
-      resolver: Resolver): Seq[NamedExpression] = {
-    expandStar(input.output, input.metadataOutput, input.resolve, 
input.inputSet.toSeq, resolver)
+  def isQualifiedByTable(childOperatorOutput: Seq[Attribute], resolver: 
Resolver): Boolean = {
+    target.exists(nameParts => childOperatorOutput.exists(matchedQualifier(_, 
nameParts, resolver)))
   }
 
-  /**
-   * Method used to expand a star. It uses output and metadata output 
attributes of the child
-   * for the expansion and it supports both recursive and non-recursive data 
types.
-   *
-   * @param childOperatorOutput The output attributes of the child operator
-   * @param childOperatorMetadataOutput The metadata output attributes of the 
child operator
-   * @param resolve A function to resolve the given name parts to an attribute
-   * @param suggestedAttributes A list of attributes that are suggested for 
expansion
-   * @param resolver The resolver used to match the name parts
-   */
-  def expandStar(
-      childOperatorOutput: Seq[Attribute],
-      childOperatorMetadataOutput: Seq[Attribute],
-      resolve: (Seq[String], Resolver) => Option[NamedExpression],
-      suggestedAttributes: Seq[Attribute],
-      resolver: Resolver,
-      cleanupNestedAliasesDuringStructExpansion: Boolean = false
-  ): Seq[NamedExpression] = {
+  override def expandStar(parameters: ExpandStarParameters): 
Seq[NamedExpression] = {
     // If there is no table specified, use all non-hidden input attributes.
-    if (target.isEmpty) return childOperatorOutput
+    if (target.isEmpty) return parameters.childOperatorOutput
 
     // If there is a table specified, use hidden input attributes as well
-    val hiddenOutput = 
childOperatorMetadataOutput.filter(_.qualifiedAccessOnly)
+    val hiddenOutput = parameters.childOperatorMetadataOutput
+      .filter(_.qualifiedAccessOnly)
       // Remove the qualified-access-only restriction immediately. The 
expanded attributes will be
       // put in a logical plan node and becomes normal attributes. They can 
still keep the special
       // attribute metadata to indicate that they are from metadata columns, 
but they should not
       // keep any restrictions that may break column resolution for normal 
attributes.
       // See SPARK-42084 for more details.
       .map(_.markAsAllowAnyAccess())
-    val expandedAttributes = (hiddenOutput ++ childOperatorOutput).filter(
-      matchedQualifier(_, target.get, resolver))
+    val expandedAttributes = (hiddenOutput ++ parameters.childOperatorOutput)
+      .filter(matchedQualifier(_, target.get, parameters.resolver))
 
     if (expandedAttributes.nonEmpty) return expandedAttributes
 
     // Try to resolve it as a struct expansion. If there is a conflict and 
both are possible,
     // (i.e. [name].* is both a table and a struct), the struct path can 
always be qualified.
-    val attribute = resolve(target.get, resolver)
+    val attribute = parameters.resolve(target.get, parameters.resolver)
     if (attribute.isDefined) {
       // If cleanupNestedAliasesDuringStructExpansion is true, we remove 
nested aliases during
       // struct expansion. This is something which is done in the 
CleanupAliases rule but for the
       // single-pass analyzer it has to be done here to avoid additional tree 
traversals.
-      val normalizedAttribute = if (cleanupNestedAliasesDuringStructExpansion) 
{
+      val normalizedAttribute = if 
(parameters.cleanupNestedAliasesDuringStructExpansion) {
         attribute.get match {
           case a: Alias => a.child
           case other => other
@@ -559,7 +584,7 @@ trait UnresolvedStarBase extends Star with Unevaluable {
           throw 
QueryCompilationErrors.starExpandDataTypeNotSupportedError(target.get)
       }
     } else {
-      val from = suggestedAttributes.map(_.name).map(toSQLId).mkString(", ")
+      val from = 
parameters.suggestedAttributes.map(_.name).map(toSQLId).mkString(", ")
       val targetString = target.get.mkString(".")
       throw 
QueryCompilationErrors.cannotResolveStarExpandGivenInputColumnsError(
         targetString, from)
@@ -592,28 +617,33 @@ case class UnresolvedStarExceptOrReplace(
 
   /**
    * We expand the * EXCEPT by the following three steps:
-   * 1. use the original .expand() to get top-level column list or struct 
expansion
+   * 1. use the original .expandStar() to get top-level column list or struct 
expansion
    * 2. resolve excepts (with respect to the Seq[NamedExpression] returned 
from (1))
    * 3. filter the expanded columns with the resolved except list. recursively 
apply filtering in
    *    case of nested columns in the except list (in order to rewrite structs)
    */
-  override def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = {
-    // Use the UnresolvedStarBase expand method to get a seq of 
NamedExpressions corresponding to
+  override def expandStar(parameters: ExpandStarParameters): 
Seq[NamedExpression] = {
+    // Use the expandStar method to get a seq of NamedExpressions 
corresponding to
     // the star expansion. This will yield a list of top-level columns from 
the logical plan's
     // output, or in the case of struct expansion (e.g. target=`x` for SELECT 
x.*) it will give
-    // a seq of Alias wrapping the struct field extraction.
-    val expandedCols = super.expand(input, resolver)
+    // a seq of NamedExpressions corresponding to struct fields.
+    val expandedCols = super.expandStar(parameters)
 
     // resolve except list with respect to the expandedCols
     val resolvedExcepts = excepts.map { exceptParts =>
-      AttributeSeq(expandedCols.map(_.toAttribute)).resolve(exceptParts, 
resolver).getOrElse {
-        val orderedCandidates = 
StringUtils.orderSuggestedIdentifiersBySimilarity(
-          UnresolvedAttribute(exceptParts).name, expandedCols.map(a => 
a.qualifier :+ a.name))
-        // if target is defined and expandedCols does not include any 
Attributes, it must be struct
-        // expansion; give message suggesting to use unqualified names of 
nested fields.
-        throw QueryCompilationErrors
-          .unresolvedColumnError(UnresolvedAttribute(exceptParts).name, 
orderedCandidates)
-      }
+      AttributeSeq(expandedCols.map(_.toAttribute))
+        .resolve(exceptParts, parameters.resolver)
+        .getOrElse {
+          val orderedCandidates = 
StringUtils.orderSuggestedIdentifiersBySimilarity(
+            UnresolvedAttribute(exceptParts).name,
+            expandedCols.map(a => a.qualifier :+ a.name)
+          )
+          // if target is defined and expandedCols does not include any 
Attributes,
+          // it must be struct expansion;
+          // give message suggesting to use unqualified names of nested fields.
+          throw QueryCompilationErrors
+            .unresolvedColumnError(UnresolvedAttribute(exceptParts).name, 
orderedCandidates)
+        }
     }
 
     // Convert each resolved except into a pair of (col: Attribute, 
nestedColumn) representing the
@@ -693,9 +723,10 @@ case class UnresolvedStarExceptOrReplace(
           val newExcepts = nestedExcepts.map { nestedExcept =>
             // INVARIANT: we cannot have duplicate column names in nested 
columns, thus, this `head`
             // will find the one and only column corresponding to the correct 
extractedField.
-            extractedFields.collectFirst { case col if resolver(col.name, 
nestedExcept.head) =>
-              col.toAttribute -> nestedExcept.tail
-            }.get
+          extractedFields.collectFirst {
+                case col if parameters.resolver(col.name, nestedExcept.head) =>
+                  col.toAttribute -> nestedExcept.tail
+              }.get
           }
           Alias(CreateStruct(
             filterColumns(extractedFields.toImmutableArraySeq, newExcepts)), 
col.name)()
@@ -740,7 +771,7 @@ case class UnresolvedStarWithColumns(
       newChildren: IndexedSeq[Expression]): UnresolvedStarWithColumns =
     copy(exprs = newChildren)
 
-  override def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = {
+  override def expandStar(parameters: ExpandStarParameters): 
Seq[NamedExpression] = {
     assert(colNames.size == exprs.size,
       s"The size of column names: ${colNames.size} isn't equal to " +
         s"the size of expressions: ${exprs.size}")
@@ -750,9 +781,9 @@ case class UnresolvedStarWithColumns(
           s"the size of metadata elements: ${m.size}")
     }
 
-    SchemaUtils.checkColumnNameDuplication(colNames, resolver)
+    SchemaUtils.checkColumnNameDuplication(colNames, parameters.resolver)
 
-    val expandedCols = super.expand(input, resolver)
+    val expandedCols = super.expandStar(parameters)
 
     val columnSeq = explicitMetadata match {
       case Some(ms) => colNames.zip(exprs).zip(ms.map(Some(_)))
@@ -761,7 +792,7 @@ case class UnresolvedStarWithColumns(
 
     val replacedAndExistingColumns = expandedCols.map { field =>
       columnSeq.find { case ((colName, _), _) =>
-        resolver(field.name, colName)
+        parameters.resolver(field.name, colName)
       } match {
         case Some(((colName, expr), m)) => Alias(expr, 
colName)(explicitMetadata = m)
         case _ => field
@@ -769,7 +800,7 @@ case class UnresolvedStarWithColumns(
     }
 
     val newColumns = columnSeq.filter { case ((colName, _), _) =>
-      !expandedCols.exists(f => resolver(f.name, colName))
+      !expandedCols.exists(f => parameters.resolver(f.name, colName))
     }.map {
       case ((colName, expr), m) => Alias(expr, colName)(explicitMetadata = m)
     }
@@ -796,17 +827,17 @@ case class UnresolvedStarWithColumnsRenames(
 
   override def target: Option[Seq[String]] = None
 
-  override def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = {
+  override def expandStar(parameters: ExpandStarParameters): 
Seq[NamedExpression] = {
     assert(existingNames.size == newNames.size,
       s"The size of existing column names: ${existingNames.size} isn't equal 
to " +
         s"the size of new column names: ${newNames.size}")
 
-    val expandedCols = super.expand(input, resolver)
+    val expandedCols = super.expandStar(parameters)
 
     existingNames.zip(newNames).foldLeft(expandedCols) {
       case (attrs, (existingName, newName)) =>
         attrs.map(attr =>
-          if (resolver(attr.name, existingName)) {
+          if (parameters.resolver(attr.name, existingName)) {
             Alias(attr, newName)()
           } else {
             attr
@@ -841,14 +872,15 @@ case class UnresolvedStar(target: Option[Seq[String]])
  */
 case class UnresolvedRegex(regexPattern: String, table: Option[String], 
caseSensitive: Boolean)
   extends LeafExpression with Star with Unevaluable {
-  override def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = {
+  override def expandStar(parameters: ExpandStarParameters): 
Seq[NamedExpression] = {
     val pattern = if (caseSensitive) regexPattern else s"(?i)$regexPattern"
     table match {
       // If there is no table specified, use all input attributes that match 
expr
-      case None => input.output.filter(_.name.matches(pattern))
+      case None => 
parameters.childOperatorOutput.filter(_.name.matches(pattern))
       // If there is a table, pick out attributes that are part of this table 
that match expr
-      case Some(t) => input.output.filter(a => a.qualifier.nonEmpty &&
-        resolver(a.qualifier.last, t)).filter(_.name.matches(pattern))
+      case Some(t) =>
+        parameters.childOperatorOutput.filter(a => a.qualifier.nonEmpty &&
+          parameters.resolver(a.qualifier.last, 
t)).filter(_.name.matches(pattern))
     }
   }
 
@@ -901,7 +933,7 @@ case class MultiAlias(child: Expression, names: Seq[String])
 case class ResolvedStar(expressions: Seq[NamedExpression])
   extends LeafExpression with Star with Unevaluable {
   override def newInstance(): NamedExpression = throw new 
UnresolvedException("newInstance")
-  override def expand(input: LogicalPlan, resolver: Resolver): 
Seq[NamedExpression] = expressions
+  override def expandStar(parameters: ExpandStarParameters): 
Seq[NamedExpression] = expressions
   override def toString: String = expressions.mkString("ResolvedStar(", ", ", 
")")
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to