This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 48d0007 [SPARK-34331][SQL] Speed up DS v2 metadata col resolution 48d0007 is described below commit 48d0007c2fe49d91aab7939c6376cdd82a4f88e2 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Fri Feb 5 16:37:29 2021 +0800 [SPARK-34331][SQL] Speed up DS v2 metadata col resolution ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/28027 https://github.com/apache/spark/pull/28027 added a DS v2 API that allows data sources to produce metadata/hidden columns that can only be seen when it's explicitly selected. The way we integrate this API into Spark is: 1. The v2 relation gets normal output and metadata output from the data source, and the metadata output is excluded from the plan output by default. 2. column resolution can resolve `UnresolvedAttribute` with metadata columns, even if the child plan doesn't output metadata columns. 3. An analyzer rule searches the query plan, trying to find a node that has missing inputs. If such node is found, transform the sub-plan of this node, and update the v2 relation to include the metadata output. The analyzer rule in step 3 brings a perf regression, for queries that do not read v2 tables at all. This rule will calculate `QueryPlan.inputSet` (which builds an `AttributeSet` from outputs of all children) and `QueryPlan.missingInput` (which does a set exclusion and creates a new `AttributeSet`) for every plan node in the query plan. In our benchmark, the TPCDS query compilation time gets increased by more than 10% This PR proposes a simple way to improve it: we add a special metadata entry to the metadata attribute, which allows us to quickly check if a plan needs to add metadata columns: we just check all the references of this plan, and see if the attribute contains the special metadata entry, instead of calculating `QueryPlan.missingInput`. This PR also fixes one bug: we should not change the final output schema of the plan, if we only use metadata columns in operators like filter, sort, etc. ### Why are the changes needed? Fix perf regression in SQL query compilation, and fix a bug. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run `org.apache.spark.sql.TPCDSQuerySuite`, before this PR, `AddMetadataColumns` is the top 4 rule ranked by running time ``` === Metrics of Analyzer/Optimizer Rules === Total number of runs: 407641 Total time: 47.257239779 seconds Rule Effective Time / Total Time Effective Runs / Total Runs OptimizeSubqueries 4157690003 / 8485444626 49 / 2778 Analyzer$ResolveAggregateFunctions 1238968711 / 3369351761 49 / 2141 ColumnPruning 660038236 / 2924755292 338 / 6391 Analyzer$AddMetadataColumns 0 / 2918352992 0 / 2151 ``` after this PR: ``` Analyzer$AddMetadataColumns 0 / 122885629 0 / 2151 ``` This rule is 20 times faster and is negligible to the total compilation time. This PR also add new tests to verify the bug fix. Closes #31440 from cloud-fan/metadata-col. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 989eb6884d77226ab4f494a4237e09aea54a032d) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 34 +++++++++++++++-- .../datasources/v2/DataSourceV2Implicits.scala | 14 +++++-- .../apache/spark/sql/connector/InMemoryTable.scala | 4 +- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 44 +++++++++++++++++----- 4 files changed, 77 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 388b2f0..e9e8ba8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -952,11 +952,37 @@ class Analyzer(override val catalogManager: CatalogManager) * columns are not accidentally selected by *. */ object AddMetadataColumns extends Rule[LogicalPlan] { + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ + + private def hasMetadataCol(plan: LogicalPlan): Boolean = { + plan.expressions.exists(_.find { + case a: Attribute => a.isMetadataCol + case _ => false + }.isDefined) + } + + private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match { + case r: DataSourceV2Relation => r.withMetadataColumns() + case _ => plan.withNewChildren(plan.children.map(addMetadataCol)) + } + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty => - node resolveOperatorsUp { - case rel: DataSourceV2Relation => - rel.withMetadataColumns() + case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) => + val inputAttrs = AttributeSet(node.children.flatMap(_.output)) + val metaCols = node.expressions.flatMap(_.collect { + case a: Attribute if a.isMetadataCol && !inputAttrs.contains(a) => a + }) + if (metaCols.isEmpty) { + node + } else { + val newNode = addMetadataCol(node) + // We should not change the output schema of the plan. We should project away the extr + // metadata columns if necessary. + if (newNode.sameOutput(node)) { + newNode + } else { + Project(node.output, newNode) + } } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index 8d91ea7..4326c73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -21,12 +21,14 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec} -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap object DataSourceV2Implicits { + private val METADATA_COL_ATTR_KEY = "__metadata_col" + implicit class TableHelper(table: Table) { def asReadable: SupportsRead = { table match { @@ -83,7 +85,8 @@ object DataSourceV2Implicits { implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) { def asStruct: StructType = { val fields = metadata.map { metaCol => - val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable) + val fieldMeta = new MetadataBuilder().putBoolean(METADATA_COL_ATTR_KEY, true).build() + val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable, fieldMeta) Option(metaCol.comment).map(field.withComment).getOrElse(field) } StructType(fields) @@ -92,6 +95,11 @@ object DataSourceV2Implicits { def toAttributes: Seq[AttributeReference] = asStruct.toAttributes } + implicit class MetadataColumnHelper(attr: Attribute) { + def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) && + attr.metadata.getBoolean(METADATA_COL_ATTR_KEY) + } + implicit class OptionsHelper(options: Map[String, String]) { def asOptions: CaseInsensitiveStringMap = { new CaseInsensitiveStringMap(options.asJava) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala index 257c380..508d793 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.connector.write._ import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull} -import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, DateType, IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String @@ -58,7 +58,7 @@ class InMemoryTable( private object IndexColumn extends MetadataColumn { override def name: String = "index" - override def dataType: DataType = StringType + override def dataType: DataType = IntegerType override def comment: String = "Metadata column used to conflict with a data column" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 42d92b1..0e12eba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -142,7 +142,7 @@ class DataSourceV2SQLSuite Array("Part 0", "id", ""), Array("", "", ""), Array("# Metadata Columns", "", ""), - Array("index", "string", "Metadata column used to conflict with a data column"), + Array("index", "int", "Metadata column used to conflict with a data column"), Array("_partition", "string", "Partition key used to store the row"), Array("", "", ""), Array("# Detailed Table Information", "", ""), @@ -2593,9 +2593,12 @@ class DataSourceV2SQLSuite "PARTITIONED BY (bucket(4, id), id)") sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") - checkAnswer( - spark.sql(s"SELECT id, data, _partition FROM $t1"), - Seq(Row(1, "a", "3/1"), Row(2, "b", "0/2"), Row(3, "c", "1/3"))) + val sqlQuery = spark.sql(s"SELECT id, data, index, _partition FROM $t1") + val dfQuery = spark.table(t1).select("id", "data", "index", "_partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), Row(3, "c", 0, "1/3"))) + } } } @@ -2606,9 +2609,12 @@ class DataSourceV2SQLSuite "PARTITIONED BY (bucket(4, index), index)") sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')") - checkAnswer( - spark.sql(s"SELECT index, data, _partition FROM $t1"), - Seq(Row(3, "c", "1/3"), Row(2, "b", "0/2"), Row(1, "a", "3/1"))) + val sqlQuery = spark.sql(s"SELECT index, data, _partition FROM $t1") + val dfQuery = spark.table(t1).select("index", "data", "_partition") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(3, "c", "1/3"), Row(2, "b", "0/2"), Row(1, "a", "3/1"))) + } } } @@ -2619,9 +2625,27 @@ class DataSourceV2SQLSuite "PARTITIONED BY (bucket(4, id), id)") sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')") - checkAnswer( - spark.sql(s"SELECT * FROM $t1"), - Seq(Row(3, "c"), Row(2, "b"), Row(1, "a"))) + val sqlQuery = spark.sql(s"SELECT * FROM $t1") + val dfQuery = spark.table(t1) + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, Seq(Row(3, "c"), Row(2, "b"), Row(1, "a"))) + } + } + } + + test("SPARK-31255: metadata column should only be produced when necessary") { + val t1 = s"${catalogAndNamespace}table" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + + val sqlQuery = spark.sql(s"SELECT * FROM $t1 WHERE index = 0") + val dfQuery = spark.table(t1).filter("index = 0") + + Seq(sqlQuery, dfQuery).foreach { query => + assert(query.schema.fieldNames.toSeq == Seq("id", "data")) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org