This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 5dc6b10 [SPARK-34923][SQL] Metadata output should be empty for more plans 5dc6b10 is described below commit 5dc6b10252c4831f7e19710fe26f9f65b5421427 Author: Karen Feng <karen.f...@databricks.com> AuthorDate: Tue Apr 6 16:04:30 2021 +0800 [SPARK-34923][SQL] Metadata output should be empty for more plans Changes the metadata propagation framework. Previously, most `LogicalPlan`'s propagated their `children`'s `metadataOutput`. This did not make sense in cases where the `LogicalPlan` did not even propagate their `children`'s `output`. I set the metadata output for plans that do not propagate their `children`'s `output` to be `Nil`. Notably, `Project` and `View` no longer have metadata output. Previously, `SELECT m from (SELECT a from tb)` would output `m` if it were metadata. This did not make sense. Yes. Now, `SELECT m from (SELECT a from tb)` will encounter an `AnalysisException`. Added unit tests. I did not cover all cases, as they are fairly extensive. However, the new tests cover major cases (and an existing test already covers Join). Closes #32017 from karenfeng/spark-34923. Authored-by: Karen Feng <karen.f...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 3b634f66c3e4a942178a1e322ae65ce82779625d) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 5 +- .../plans/logical/basicLogicalOperators.scala | 25 +++++ .../spark/sql/connector/DataSourceV2SQLSuite.scala | 103 +++++++++++++++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index bdf37d0..3ea79b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -33,7 +33,10 @@ abstract class LogicalPlan with QueryPlanConstraints with Logging { - /** Metadata fields that can be projected from this node */ + /** + * Metadata fields that can be projected from this node. + * Should be overridden if the plan does not propagate its children's output. + */ def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput) /** Returns true if this subtree has data from a streaming data source. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 338c1db..224e7bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -59,6 +59,7 @@ object Subquery { case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override def metadataOutput: Seq[Attribute] = Nil override def maxRows: Option[Long] = child.maxRows override lazy val resolved: Boolean = { @@ -185,6 +186,8 @@ case class Intersect( leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable) } + override def metadataOutput: Seq[Attribute] = Nil + override protected lazy val validConstraints: ExpressionSet = leftConstraints.union(rightConstraints) @@ -205,6 +208,8 @@ case class Except( /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output + override def metadataOutput: Seq[Attribute] = Nil + override protected lazy val validConstraints: ExpressionSet = leftConstraints } @@ -268,6 +273,8 @@ case class Union( } } + override def metadataOutput: Seq[Attribute] = Nil + override lazy val resolved: Boolean = { // allChildrenCompatible needs to be evaluated after childrenResolved def allChildrenCompatible: Boolean = @@ -343,6 +350,17 @@ case class Join( } } + override def metadataOutput: Seq[Attribute] = { + joinType match { + case ExistenceJoin(_) => + left.metadataOutput + case LeftExistence(_) => + left.metadataOutput + case _ => + children.flatMap(_.metadataOutput) + } + } + override protected lazy val validConstraints: ExpressionSet = { joinType match { case _: InnerLike if condition.isDefined => @@ -419,6 +437,7 @@ case class InsertIntoDir( extends UnaryNode { override def output: Seq[Attribute] = Seq.empty + override def metadataOutput: Seq[Attribute] = Nil override lazy val resolved: Boolean = false } @@ -449,6 +468,8 @@ case class View( override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) + override def metadataOutput: Seq[Attribute] = Nil + override def simpleString(maxFields: Int): String = { s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})" } @@ -616,6 +637,7 @@ case class Aggregate( } override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) + override def metadataOutput: Seq[Attribute] = Nil override def maxRows: Option[Long] = { if (groupingExpressions.isEmpty) { Some(1L) @@ -751,6 +773,8 @@ case class Expand( override lazy val references: AttributeSet = AttributeSet(projections.flatten.flatMap(_.references)) + override def metadataOutput: Seq[Attribute] = Nil + override def producedAttributes: AttributeSet = AttributeSet(output diff child.output) // This operator can reuse attributes (for example making them null when doing a roll up) so @@ -813,6 +837,7 @@ case class Pivot( } groupByExprsOpt.getOrElse(Seq.empty).map(_.toAttribute) ++ pivotAgg } + override def metadataOutput: Seq[Attribute] = Nil } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 3c49186..eccd6c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2794,6 +2794,109 @@ class DataSourceV2SQLSuite }.getMessage assert(errMsg.contains(expectedError)) } + + test("SPARK-34923: do not propagate metadata columns through Project") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + assertThrows[AnalysisException] { + sql(s"SELECT index, _partition from (SELECT id, data FROM $t1)") + } + assertThrows[AnalysisException] { + spark.table(t1).select("id", "data").select("index", "_partition") + } + } + } + + test("SPARK-34923: do not propagate metadata columns through View") { + val t1 = s"${catalogAndNamespace}table" + val view = "view" + + withTable(t1) { + withTempView(view) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + sql(s"CACHE TABLE $view AS SELECT * FROM $t1") + assertThrows[AnalysisException] { + sql(s"SELECT index, _partition FROM $view") + } + } + } + } + + test("SPARK-34923: propagate metadata columns through Filter") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 WHERE id > 1") + val dfQuery = spark.table(t1).where("id > 1").select("id", "data", "index", "_partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } + } + } + + test("SPARK-34923: propagate metadata columns through Sort") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1 ORDER BY id") + val dfQuery = spark.table(t1).orderBy("id").select("id", "data", "index", "_partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } + } + } + + test("SPARK-34923: propagate metadata columns through RepartitionBy") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val sqlQuery = spark.sql( + s"SELECT /*+ REPARTITION_BY_RANGE(3, id) */ id, data, index, _partition FROM $t1") + val tbl = spark.table(t1) + val dfQuery = tbl.repartitionByRange(3, tbl.col("id")) + .select("id", "data", "index", "_partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } + } + } + + test("SPARK-34923: propagate metadata columns through SubqueryAlias") { + val t1 = s"${catalogAndNamespace}table" + val sbq = "sbq" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + val sqlQuery = spark.sql( + s"SELECT $sbq.id, $sbq.data, $sbq.index, $sbq._partition FROM $t1 as $sbq") + val dfQuery = spark.table(t1).as(sbq).select( + s"$sbq.id", s"$sbq.data", s"$sbq.index", s"$sbq._partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org