This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b08ac303faed [SPARK-53521] Refactor Star expression b08ac303faed is described below commit b08ac303faed596efca499136f68884064c62ee2 Author: Mikhail Nikoliukin <mikhail.nikoliu...@databricks.com> AuthorDate: Tue Sep 9 11:33:05 2025 +0800 [SPARK-53521] Refactor Star expression ### What changes were proposed in this pull request? Small refactor of the Star trait to make it compatible with the new single-pass Analyzer. Basically, remove `LogicalPlan` from the signature of core methods. This makes it possible to call them using `NameScope`. ### Why are the changes needed? To eventually support all types of star expressions in the single pass analyzer ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v1.0.107 Closes #52268 from mikhailnik-db/refactor-star-trait. Authored-by: Mikhail Nikoliukin <mikhail.nikoliu...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/resolver/NameScope.scala | 23 +-- .../spark/sql/catalyst/analysis/unresolved.scala | 158 +++++++++++++-------- 3 files changed, 109 insertions(+), 74 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1896a1c7ac27..69731cd9576f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1945,7 +1945,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor f1.nameParts == Seq("count") && f1.arguments.length == 1) { f1.arguments.foreach { - case u: UnresolvedStar if u.isQualifiedByTable(child, resolver) => + case u: UnresolvedStar if u.isQualifiedByTable(child.output, resolver) => throw QueryCompilationErrors .singleTableStarInCountNotAllowedError(u.target.get.mkString(".")) case _ => // do nothing diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala index 3ccae116cb18..ed731432e2a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/NameScope.scala @@ -24,9 +24,10 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.{ + ExpandStarParameters, LiteralFunctionResolution, Resolver => NameComparator, - UnresolvedStar + Star } import org.apache.spark.sql.catalyst.expressions.{ Alias, @@ -323,7 +324,7 @@ class NameScope( } /** - * Expand the [[UnresolvedStar]]. The expected use case for this method is star expansion inside + * Expand the [[Star]]. The expected use case for this method is star expansion inside * [[Project]]. * * Star without a target: @@ -360,16 +361,18 @@ class NameScope( * SELECT concat_ws('', *) AS result FROM VALUES (1, 2, 3); * }}} * - * Also, see [[UnresolvedStarBase.expandStar]] for more details. + * Also, see [[Star.expandStar]] for more details. */ - def expandStar(unresolvedStar: UnresolvedStar): Seq[NamedExpression] = { + def expandStar(unresolvedStar: Star): Seq[NamedExpression] = { unresolvedStar.expandStar( - childOperatorOutput = output, - childOperatorMetadataOutput = hiddenOutput, - resolve = (nameParts, comparator) => resolveNameInStarExpansion(nameParts, comparator), - suggestedAttributes = output, - resolver = nameComparator, - cleanupNestedAliasesDuringStructExpansion = true + ExpandStarParameters( + childOperatorOutput = output, + childOperatorMetadataOutput = hiddenOutput, + resolve = (nameParts, comparator) => resolveNameInStarExpansion(nameParts, comparator), + suggestedAttributes = output, + resolver = nameComparator, + cleanupNestedAliasesDuringStructExpansion = true + ) ) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 253597574d9f..2e0dbe9b7f42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -425,6 +425,26 @@ object UnresolvedFunction { } } +/** + * Encapsulates the arguments needed for [[Star.expandStar]]. + * + * @param childOperatorOutput The output attributes of the child operator + * @param childOperatorMetadataOutput The metadata output attributes of the child operator + * @param resolve A function to resolve the given name parts to an attribute + * @param suggestedAttributes A list of attributes that are suggested for expansion + * @param resolver The resolver used to match the name parts + * @param cleanupNestedAliasesDuringStructExpansion Whether to cleanup nested aliases during + * struct expansion + */ +case class ExpandStarParameters( + childOperatorOutput: Seq[Attribute], + childOperatorMetadataOutput: Seq[Attribute], + resolve: (Seq[String], Resolver) => Option[NamedExpression], + suggestedAttributes: Seq[Attribute], + resolver: Resolver, + cleanupNestedAliasesDuringStructExpansion: Boolean = false +) + /** * Represents all of the input attributes to a given relational operator, for example in * "SELECT * FROM ...". A [[Star]] gets automatically expanded during analysis. @@ -440,7 +460,34 @@ trait Star extends NamedExpression { override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false - def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] + /** + * + * Expand the * as either: + * 1. all the columns in the output of the input logical plan + * 2. struct expansion - returning the fields of the target struct as top-level columns + * e.g. SELECT x.* => fields of struct x + * + * It uses output and metadata output attributes of the child + * for the expansion, and it supports both recursive and non-recursive data types. + * + * @param parameters The arguments needed for star expansion + */ + def expandStar(parameters: ExpandStarParameters): Seq[NamedExpression] + + /** + * Entry point for fixed point analyzer. + */ + final def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + expandStar( + ExpandStarParameters( + childOperatorOutput = input.output, + childOperatorMetadataOutput = input.metadataOutput, + resolve = input.resolve, + suggestedAttributes = input.inputSet.toSeq, + resolver = resolver + ) + ) + } } /** @@ -488,58 +535,36 @@ trait UnresolvedStarBase extends Star with Unevaluable { nameParts.corresponds(qualifierList)(resolver) } - def isQualifiedByTable(input: LogicalPlan, resolver: Resolver): Boolean = { - target.exists(nameParts => input.output.exists(matchedQualifier(_, nameParts, resolver))) - } - - override def expand( - input: LogicalPlan, - resolver: Resolver): Seq[NamedExpression] = { - expandStar(input.output, input.metadataOutput, input.resolve, input.inputSet.toSeq, resolver) + def isQualifiedByTable(childOperatorOutput: Seq[Attribute], resolver: Resolver): Boolean = { + target.exists(nameParts => childOperatorOutput.exists(matchedQualifier(_, nameParts, resolver))) } - /** - * Method used to expand a star. It uses output and metadata output attributes of the child - * for the expansion and it supports both recursive and non-recursive data types. - * - * @param childOperatorOutput The output attributes of the child operator - * @param childOperatorMetadataOutput The metadata output attributes of the child operator - * @param resolve A function to resolve the given name parts to an attribute - * @param suggestedAttributes A list of attributes that are suggested for expansion - * @param resolver The resolver used to match the name parts - */ - def expandStar( - childOperatorOutput: Seq[Attribute], - childOperatorMetadataOutput: Seq[Attribute], - resolve: (Seq[String], Resolver) => Option[NamedExpression], - suggestedAttributes: Seq[Attribute], - resolver: Resolver, - cleanupNestedAliasesDuringStructExpansion: Boolean = false - ): Seq[NamedExpression] = { + override def expandStar(parameters: ExpandStarParameters): Seq[NamedExpression] = { // If there is no table specified, use all non-hidden input attributes. - if (target.isEmpty) return childOperatorOutput + if (target.isEmpty) return parameters.childOperatorOutput // If there is a table specified, use hidden input attributes as well - val hiddenOutput = childOperatorMetadataOutput.filter(_.qualifiedAccessOnly) + val hiddenOutput = parameters.childOperatorMetadataOutput + .filter(_.qualifiedAccessOnly) // Remove the qualified-access-only restriction immediately. The expanded attributes will be // put in a logical plan node and becomes normal attributes. They can still keep the special // attribute metadata to indicate that they are from metadata columns, but they should not // keep any restrictions that may break column resolution for normal attributes. // See SPARK-42084 for more details. .map(_.markAsAllowAnyAccess()) - val expandedAttributes = (hiddenOutput ++ childOperatorOutput).filter( - matchedQualifier(_, target.get, resolver)) + val expandedAttributes = (hiddenOutput ++ parameters.childOperatorOutput) + .filter(matchedQualifier(_, target.get, parameters.resolver)) if (expandedAttributes.nonEmpty) return expandedAttributes // Try to resolve it as a struct expansion. If there is a conflict and both are possible, // (i.e. [name].* is both a table and a struct), the struct path can always be qualified. - val attribute = resolve(target.get, resolver) + val attribute = parameters.resolve(target.get, parameters.resolver) if (attribute.isDefined) { // If cleanupNestedAliasesDuringStructExpansion is true, we remove nested aliases during // struct expansion. This is something which is done in the CleanupAliases rule but for the // single-pass analyzer it has to be done here to avoid additional tree traversals. - val normalizedAttribute = if (cleanupNestedAliasesDuringStructExpansion) { + val normalizedAttribute = if (parameters.cleanupNestedAliasesDuringStructExpansion) { attribute.get match { case a: Alias => a.child case other => other @@ -559,7 +584,7 @@ trait UnresolvedStarBase extends Star with Unevaluable { throw QueryCompilationErrors.starExpandDataTypeNotSupportedError(target.get) } } else { - val from = suggestedAttributes.map(_.name).map(toSQLId).mkString(", ") + val from = parameters.suggestedAttributes.map(_.name).map(toSQLId).mkString(", ") val targetString = target.get.mkString(".") throw QueryCompilationErrors.cannotResolveStarExpandGivenInputColumnsError( targetString, from) @@ -592,28 +617,33 @@ case class UnresolvedStarExceptOrReplace( /** * We expand the * EXCEPT by the following three steps: - * 1. use the original .expand() to get top-level column list or struct expansion + * 1. use the original .expandStar() to get top-level column list or struct expansion * 2. resolve excepts (with respect to the Seq[NamedExpression] returned from (1)) * 3. filter the expanded columns with the resolved except list. recursively apply filtering in * case of nested columns in the except list (in order to rewrite structs) */ - override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { - // Use the UnresolvedStarBase expand method to get a seq of NamedExpressions corresponding to + override def expandStar(parameters: ExpandStarParameters): Seq[NamedExpression] = { + // Use the expandStar method to get a seq of NamedExpressions corresponding to // the star expansion. This will yield a list of top-level columns from the logical plan's // output, or in the case of struct expansion (e.g. target=`x` for SELECT x.*) it will give - // a seq of Alias wrapping the struct field extraction. - val expandedCols = super.expand(input, resolver) + // a seq of NamedExpressions corresponding to struct fields. + val expandedCols = super.expandStar(parameters) // resolve except list with respect to the expandedCols val resolvedExcepts = excepts.map { exceptParts => - AttributeSeq(expandedCols.map(_.toAttribute)).resolve(exceptParts, resolver).getOrElse { - val orderedCandidates = StringUtils.orderSuggestedIdentifiersBySimilarity( - UnresolvedAttribute(exceptParts).name, expandedCols.map(a => a.qualifier :+ a.name)) - // if target is defined and expandedCols does not include any Attributes, it must be struct - // expansion; give message suggesting to use unqualified names of nested fields. - throw QueryCompilationErrors - .unresolvedColumnError(UnresolvedAttribute(exceptParts).name, orderedCandidates) - } + AttributeSeq(expandedCols.map(_.toAttribute)) + .resolve(exceptParts, parameters.resolver) + .getOrElse { + val orderedCandidates = StringUtils.orderSuggestedIdentifiersBySimilarity( + UnresolvedAttribute(exceptParts).name, + expandedCols.map(a => a.qualifier :+ a.name) + ) + // if target is defined and expandedCols does not include any Attributes, + // it must be struct expansion; + // give message suggesting to use unqualified names of nested fields. + throw QueryCompilationErrors + .unresolvedColumnError(UnresolvedAttribute(exceptParts).name, orderedCandidates) + } } // Convert each resolved except into a pair of (col: Attribute, nestedColumn) representing the @@ -693,9 +723,10 @@ case class UnresolvedStarExceptOrReplace( val newExcepts = nestedExcepts.map { nestedExcept => // INVARIANT: we cannot have duplicate column names in nested columns, thus, this `head` // will find the one and only column corresponding to the correct extractedField. - extractedFields.collectFirst { case col if resolver(col.name, nestedExcept.head) => - col.toAttribute -> nestedExcept.tail - }.get + extractedFields.collectFirst { + case col if parameters.resolver(col.name, nestedExcept.head) => + col.toAttribute -> nestedExcept.tail + }.get } Alias(CreateStruct( filterColumns(extractedFields.toImmutableArraySeq, newExcepts)), col.name)() @@ -740,7 +771,7 @@ case class UnresolvedStarWithColumns( newChildren: IndexedSeq[Expression]): UnresolvedStarWithColumns = copy(exprs = newChildren) - override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + override def expandStar(parameters: ExpandStarParameters): Seq[NamedExpression] = { assert(colNames.size == exprs.size, s"The size of column names: ${colNames.size} isn't equal to " + s"the size of expressions: ${exprs.size}") @@ -750,9 +781,9 @@ case class UnresolvedStarWithColumns( s"the size of metadata elements: ${m.size}") } - SchemaUtils.checkColumnNameDuplication(colNames, resolver) + SchemaUtils.checkColumnNameDuplication(colNames, parameters.resolver) - val expandedCols = super.expand(input, resolver) + val expandedCols = super.expandStar(parameters) val columnSeq = explicitMetadata match { case Some(ms) => colNames.zip(exprs).zip(ms.map(Some(_))) @@ -761,7 +792,7 @@ case class UnresolvedStarWithColumns( val replacedAndExistingColumns = expandedCols.map { field => columnSeq.find { case ((colName, _), _) => - resolver(field.name, colName) + parameters.resolver(field.name, colName) } match { case Some(((colName, expr), m)) => Alias(expr, colName)(explicitMetadata = m) case _ => field @@ -769,7 +800,7 @@ case class UnresolvedStarWithColumns( } val newColumns = columnSeq.filter { case ((colName, _), _) => - !expandedCols.exists(f => resolver(f.name, colName)) + !expandedCols.exists(f => parameters.resolver(f.name, colName)) }.map { case ((colName, expr), m) => Alias(expr, colName)(explicitMetadata = m) } @@ -796,17 +827,17 @@ case class UnresolvedStarWithColumnsRenames( override def target: Option[Seq[String]] = None - override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + override def expandStar(parameters: ExpandStarParameters): Seq[NamedExpression] = { assert(existingNames.size == newNames.size, s"The size of existing column names: ${existingNames.size} isn't equal to " + s"the size of new column names: ${newNames.size}") - val expandedCols = super.expand(input, resolver) + val expandedCols = super.expandStar(parameters) existingNames.zip(newNames).foldLeft(expandedCols) { case (attrs, (existingName, newName)) => attrs.map(attr => - if (resolver(attr.name, existingName)) { + if (parameters.resolver(attr.name, existingName)) { Alias(attr, newName)() } else { attr @@ -841,14 +872,15 @@ case class UnresolvedStar(target: Option[Seq[String]]) */ case class UnresolvedRegex(regexPattern: String, table: Option[String], caseSensitive: Boolean) extends LeafExpression with Star with Unevaluable { - override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = { + override def expandStar(parameters: ExpandStarParameters): Seq[NamedExpression] = { val pattern = if (caseSensitive) regexPattern else s"(?i)$regexPattern" table match { // If there is no table specified, use all input attributes that match expr - case None => input.output.filter(_.name.matches(pattern)) + case None => parameters.childOperatorOutput.filter(_.name.matches(pattern)) // If there is a table, pick out attributes that are part of this table that match expr - case Some(t) => input.output.filter(a => a.qualifier.nonEmpty && - resolver(a.qualifier.last, t)).filter(_.name.matches(pattern)) + case Some(t) => + parameters.childOperatorOutput.filter(a => a.qualifier.nonEmpty && + parameters.resolver(a.qualifier.last, t)).filter(_.name.matches(pattern)) } } @@ -901,7 +933,7 @@ case class MultiAlias(child: Expression, names: Seq[String]) case class ResolvedStar(expressions: Seq[NamedExpression]) extends LeafExpression with Star with Unevaluable { override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") - override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = expressions + override def expandStar(parameters: ExpandStarParameters): Seq[NamedExpression] = expressions override def toString: String = expressions.mkString("ResolvedStar(", ", ", ")") } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org