This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8fde8bd68ae [SPARK-42084][SQL] Avoid leaking the qualified-access-only
restriction
8fde8bd68ae is described below
commit 8fde8bd68ae51757be29f4b586556eb25b3aa2b7
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Jan 18 18:43:55 2023 +0800
[SPARK-42084][SQL] Avoid leaking the qualified-access-only restriction
### What changes were proposed in this pull request?
This is a better fix than https://github.com/apache/spark/pull/39077 and
https://github.com/apache/spark/pull/38862
The special attribute metadata `__qualified_access_only` is very risky, as
it breaks normal column resolution. The aforementioned 2 PRs remove the
restriction in `SubqueryAlias` and `Alias`, but it's not good enough as we may
forget to do the same thing for new logical plans/expressions in the future.
It's also problematic if advanced users manipulate logical plans and
expressions directly, when there is no `SubqueryAlias` and `Alias` to remove
the restriction.
To be safe, we should only apply this restriction when resolving join
hidden columns, which means the plan node right above `Project(Join(using or
natural join))`. This PR simply removes the restriction when a column is
resolved from a sequence of `Attributes`, or from star expansion, and also when
adding the `Project` hidden columns to its output. This makes sure that the
qualified-access-only restriction will not be leaked to normal column
resolution, but only metadata column resolution.
### Why are the changes needed?
To make the join hidden column feature more robust
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
Closes #39596 from cloud-fan/join.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 9 ++++++---
.../spark/sql/catalyst/analysis/unresolved.scala | 6 ++++++
.../sql/catalyst/expressions/namedExpressions.scala | 7 ++-----
.../spark/sql/catalyst/expressions/package.scala | 7 ++++++-
.../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +-
.../catalyst/plans/logical/basicLogicalOperators.scala | 10 +---------
.../org/apache/spark/sql/catalyst/util/package.scala | 18 ++++++++++++------
7 files changed, 34 insertions(+), 25 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d6b68a45e77..ba2c2759e2d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -194,7 +194,10 @@ class Analyzer(override val catalogManager: CatalogManager)
override protected def isPlanIntegral(
previousPlan: LogicalPlan,
currentPlan: LogicalPlan): Boolean = {
- !Utils.isTesting ||
LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan)
+ import org.apache.spark.sql.catalyst.util._
+ !Utils.isTesting ||
(LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) &&
+ (!LogicalPlanIntegrity.canGetOutputAttrs(currentPlan) ||
+ !currentPlan.output.exists(_.qualifiedAccessOnly)))
}
override def isView(nameParts: Seq[String]): Boolean =
v1SessionCatalog.isView(nameParts)
@@ -984,7 +987,6 @@ class Analyzer(override val catalogManager: CatalogManager)
* projecting away metadata columns prematurely.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {
-
import org.apache.spark.sql.catalyst.util._
def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsDownWithPruning(
@@ -1039,7 +1041,8 @@ class Analyzer(override val catalogManager:
CatalogManager)
s.withMetadataColumns()
case p: Project if p.metadataOutput.exists(a =>
requiredAttrIds.contains(a.exprId)) =>
val newProj = p.copy(
- projectList = p.projectList ++ p.metadataOutput,
+ // Do not leak the qualified-access-only restriction to normal plan
outputs.
+ projectList = p.projectList ++
p.metadataOutput.map(_.markAsAllowAnyAccess()),
child = addMetadataCol(p.child, requiredAttrIds))
newProj.copyTagsFrom(p)
newProj
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 4a4028dc4c4..5e20f12747b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -425,6 +425,12 @@ case class UnresolvedStar(target: Option[Seq[String]])
extends Star with Unevalu
// If there is a table specified, use hidden input attributes as well
val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly)
+ // Remove the qualified-access-only restriction immediately. The
expanded attributes will be
+ // put in a logical plan node and becomes normal attributes. They can
still keep the special
+ // attribute metadata to indicate that they are from metadata columns,
but they should not
+ // keep any restrictions that may break column resolution for normal
attributes.
+ // See SPARK-42084 for more details.
+ .map(_.markAsAllowAnyAccess())
val expandedAttributes = (hiddenOutput ++ input.output).filter(
matchedQualifier(_, target.get, resolver))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index ed820c80561..d18cfea1629 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.trees.TreePattern
import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.{quoteIfNeeded,
METADATA_COL_ATTR_KEY}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.collection.ImmutableBitSet
@@ -190,10 +190,7 @@ case class Alias(child: Expression, name: String)(
override def toAttribute: Attribute = {
if (resolved) {
- val a = AttributeReference(name, child.dataType, child.nullable,
metadata)(exprId, qualifier)
- // Alias has its own qualifier. It doesn't make sense to still restrict
the hidden columns
- // of natural/using join to be accessed by qualified name only.
- if (a.qualifiedAccessOnly) a.markAsAllowAnyAccess() else a
+ AttributeReference(name, child.dataType, child.nullable,
metadata)(exprId, qualifier)
} else {
UnresolvedAttribute.quoted(name)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 44813ac7b61..74f0875c285 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -337,7 +337,12 @@ package object expressions {
}
def name = UnresolvedAttribute(nameParts).name
- prunedCandidates match {
+ // We may have resolved the attributes from metadata columns. The
resolved attributes will be
+ // put in a logical plan node and becomes normal attributes. They can
still keep the special
+ // attribute metadata to indicate that they are from metadata columns,
but they should not
+ // keep any restrictions that may break column resolution for normal
attributes.
+ // See SPARK-42084 for more details.
+ prunedCandidates.map(_.markAsAllowAnyAccess()) match {
case Seq(a) if nestedFields.nonEmpty =>
// One match, but we also need to extract the requested nested field.
// The foldLeft adds ExtractValues for every remaining parts of the
identifier,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 7640d9234c7..d3df6f0dd98 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -211,7 +211,7 @@ abstract class OrderPreservingUnaryNode extends UnaryNode {
object LogicalPlanIntegrity {
- private def canGetOutputAttrs(p: LogicalPlan): Boolean = {
+ def canGetOutputAttrs(p: LogicalPlan): Boolean = {
p.resolved && !p.expressions.exists { e =>
e.exists {
// We cannot call `output` in plans with a `ScalarSubquery` expr
having no column,
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 586e344df5e..343fa3517c6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1664,15 +1664,7 @@ case class SubqueryAlias(
override def output: Seq[Attribute] = {
val qualifierList = identifier.qualifier :+ alias
- child.output.map { attr =>
- // `SubqueryAlias` sets a new qualifier for its output columns. It
doesn't make sense to still
- // restrict the hidden columns of natural/using join to be accessed by
qualified name only.
- if (attr.qualifiedAccessOnly) {
- attr.markAsAllowAnyAccess().withQualifier(qualifierList)
- } else {
- attr.withQualifier(qualifierList)
- }
- }
+ child.output.map(_.withQualifier(qualifierList))
}
override def metadataOutput: Seq[Attribute] = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 1fcd3f7662b..6466afac619 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -213,11 +213,17 @@ package object util extends Logging {
.build()
)
- def markAsAllowAnyAccess(): Attribute = attr.withMetadata(
- new MetadataBuilder()
- .withMetadata(attr.metadata)
- .remove(QUALIFIED_ACCESS_ONLY)
- .build()
- )
+ def markAsAllowAnyAccess(): Attribute = {
+ if (qualifiedAccessOnly) {
+ attr.withMetadata(
+ new MetadataBuilder()
+ .withMetadata(attr.metadata)
+ .remove(QUALIFIED_ACCESS_ONLY)
+ .build()
+ )
+ } else {
+ attr
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]