This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 7e71378764f Revert "[SPARK-41498] Propagate metadata through Union"
7e71378764f is described below

commit 7e71378764fdaba42bf3868414045af920821742
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Mar 10 10:30:13 2023 -0800

    Revert "[SPARK-41498] Propagate metadata through Union"
    
    This reverts commit 827ca9b82476552458e8ba7b01b90001895e8384.
    
    ### What changes were proposed in this pull request?
    
    After more thinking, it's a bit fragile to propagate metadata columns 
through Union. We have added quite some new fields in the file source 
`_metadata` metadata column such as `row_index`, `block_start`, etc. Some are 
parquet only. The same thing may happen in other data sources as well. If one 
day one table under Union adds a new metadata column (or add a new field if the 
metadata column is a struct type), but other tables under Union do not have 
this new column, then Union can't pro [...]
    
    To be future-proof, let's revert this support.
    
    ### Why are the changes needed?
    
    to make the analysis behavior more robust.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but propagating metadata columns through Union is not released yet.
    
    ### How was this patch tested?
    
    N/A
    
    Closes #40371 from cloud-fan/revert.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 164db5ba3c39614017f5ef6428194a442d79b425)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   7 -
 .../plans/logical/basicLogicalOperators.scala      |  58 +----
 .../sql/connector/catalog/InMemoryBaseTable.scala  |   2 +-
 .../spark/sql/connector/MetadataColumnSuite.scala  | 283 ---------------------
 4 files changed, 14 insertions(+), 336 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e5d78b21f19..ddd26c2efe1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1040,13 +1040,6 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
           child = addMetadataCol(p.child, requiredAttrIds))
         newProj.copyTagsFrom(p)
         newProj
-      case u: Union if u.metadataOutput.exists(a => 
requiredAttrIds.contains(a.exprId)) =>
-        u.withNewChildren(u.children.map { child =>
-          // The children of a Union will have the same attributes with 
different expression IDs
-          val exprIdMap = u.metadataOutput.map(_.exprId)
-            .zip(child.metadataOutput.map(_.exprId)).toMap
-          addMetadataCol(child, requiredAttrIds.map(a => 
exprIdMap.getOrElse(a, a)))
-        })
       case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_, 
requiredAttrIds)))
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 74929bf5d79..21bf6419cdd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -450,55 +450,23 @@ case class Union(
       AttributeSet.fromAttributeSets(children.map(_.outputSet)).size
   }
 
-  /**
-   * Merges a sequence of attributes to have a common datatype and updates the
-   * nullability to be consistent with the attributes being merged.
-   */
-  private def mergeAttributes(attributes: Seq[Attribute]): Attribute = {
-    val firstAttr = attributes.head
-    val nullable = attributes.exists(_.nullable)
-    val newDt = attributes.map(_.dataType).reduce(StructType.unionLikeMerge)
-    if (firstAttr.dataType == newDt) {
-      firstAttr.withNullability(nullable)
-    } else {
-      AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
-        firstAttr.exprId, firstAttr.qualifier)
-    }
-  }
-
-  override def output: Seq[Attribute] = 
children.map(_.output).transpose.map(mergeAttributes)
-
-  override def metadataOutput: Seq[Attribute] = {
-    val childrenMetadataOutput = children.map(_.metadataOutput)
-    // This follows similar code in `CheckAnalysis` to check if the output of 
a Union is correct,
-    // but just silently doesn't return an output instead of throwing an 
error. It also ensures
-    // that the attribute and data type names are the same.
-    val refDataTypes = childrenMetadataOutput.head.map(_.dataType)
-    val refAttrNames = childrenMetadataOutput.head.map(_.name)
-    childrenMetadataOutput.tail.foreach { childMetadataOutput =>
-      // We can only propagate the metadata output correctly if every child 
has the same
-      // number of columns
-      if (childMetadataOutput.length != refDataTypes.length) return Nil
-      // Check if the data types match by name and type
-      val childDataTypes = childMetadataOutput.map(_.dataType)
-      childDataTypes.zip(refDataTypes).foreach { case (dt1, dt2) =>
-        if (!DataType.equalsStructurally(dt1, dt2, true) ||
-           !DataType.equalsStructurallyByName(dt1, dt2, conf.resolver)) {
-          return Nil
-        }
-      }
-      // Check that the names of the attributes match
-      val childAttrNames = childMetadataOutput.map(_.name)
-      childAttrNames.zip(refAttrNames).foreach { case (attrName1, attrName2) =>
-        if (!conf.resolver(attrName1, attrName2)) {
-          return Nil
-        }
+  // updating nullability to make all the children consistent
+  override def output: Seq[Attribute] = {
+    children.map(_.output).transpose.map { attrs =>
+      val firstAttr = attrs.head
+      val nullable = attrs.exists(_.nullable)
+      val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
+      if (firstAttr.dataType == newDt) {
+        firstAttr.withNullability(nullable)
+      } else {
+        AttributeReference(firstAttr.name, newDt, nullable, 
firstAttr.metadata)(
+          firstAttr.exprId, firstAttr.qualifier)
       }
     }
-    // If the metadata output matches, merge the attributes and return them
-    childrenMetadataOutput.transpose.map(mergeAttributes)
   }
 
+  override def metadataOutput: Seq[Attribute] = Nil
+
   override lazy val resolved: Boolean = {
     // allChildrenCompatible needs to be evaluated after childrenResolved
     def allChildrenCompatible: Boolean =
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index f169db11085..cd7d80a8296 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -73,7 +73,7 @@ abstract class InMemoryBaseTable(
 
   // purposely exposes a metadata column that conflicts with a data column in 
some tests
   override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, 
PartitionKeyColumn)
-  private lazy val metadataColumnNames = metadataColumns.map(_.name).toSet -- 
schema.map(_.name)
+  private val metadataColumnNames = metadataColumns.map(_.name).toSet -- 
schema.map(_.name)
 
   private val allowUnsupportedTransforms =
     properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
index 982f61728d9..9abf0fd59e6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
@@ -17,28 +17,13 @@
 
 package org.apache.spark.sql.connector
 
-import java.io.{File, FilenameFilter}
-
 import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog, 
InMemoryTable, MetadataColumn, Table}
-import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.struct
-import org.apache.spark.sql.types.{DataType, IntegerType, StringType, 
StructField, StructType}
 
 class MetadataColumnSuite extends DatasourceV2SQLBase {
   import testImplicits._
 
-  override protected def beforeEach(): Unit = {
-    super.beforeEach()
-    spark.conf.set("spark.sql.catalog.testCatalog", 
classOf[MetadataTestCatalog].getName)
-    spark.conf.set("spark.sql.catalog.typeMismatch", 
classOf[MetadataTypeMismatchCatalog].getName)
-    spark.conf.set(
-      "spark.sql.catalog.nameMismatch", 
classOf[MetadataAttrNameMismatchCatalog].getName)
-    spark.conf.set(
-      "spark.sql.catalog.fieldNameMismatch", 
classOf[MetadataFieldNameMismatchCatalog].getName)
-  }
-
   private val tbl = "testcat.t"
 
   private def prepareTable(): Unit = {
@@ -264,272 +249,4 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
       }
     }
   }
-
-  test("SPARK-41498: Metadata column is propagated through union") {
-    withTable(tbl) {
-      prepareTable()
-      val df = spark.table(tbl)
-      val dfQuery = df.union(df).select("id", "data", "index", "_partition")
-      val expectedAnswer = Seq(Row(1, "a", 0, "3/1"), Row(2, "b", 0, "0/2"), 
Row(3, "c", 0, "1/3"))
-      checkAnswer(dfQuery, expectedAnswer ++ expectedAnswer)
-    }
-  }
-
-  test("SPARK-41498: Nested metadata column is propagated through union") {
-    withTempDir { dir =>
-      spark.range(start = 0, end = 10, step = 1, numPartitions = 1)
-        .write.mode("overwrite").save(dir.getAbsolutePath)
-      val df = spark.read.load(dir.getAbsolutePath)
-      val dfQuery = df.union(df).select("_metadata.file_path")
-
-      val filePath = dir.listFiles(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = 
name.endsWith(".parquet")
-      }).map(_.getAbsolutePath)
-      assert(filePath.length == 1)
-      val expectedAnswer = (1 to 20).map(_ => Row("file:" ++ filePath.head))
-      checkAnswer(dfQuery, expectedAnswer)
-    }
-  }
-
-  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
-    "have metadata output of different size") {
-    withTable(tbl) {
-      prepareTable()
-      withTempDir { dir =>
-        spark.range(start = 10, end = 20).selectExpr("bigint(id) as id", 
"string(id) as data")
-          .write.mode("overwrite").save(dir.getAbsolutePath)
-        val df1 = spark.table(tbl)
-        val df2 = spark.read.load(dir.getAbsolutePath)
-
-        // Make sure one df contains a metadata column and the other does not
-        assert(!df1.queryExecution.analyzed.metadataOutput.exists(_.name == 
"_metadata"))
-        assert(df2.queryExecution.analyzed.metadataOutput.exists(_.name == 
"_metadata"))
-
-        assert(df1.union(df2).queryExecution.analyzed.metadataOutput.isEmpty)
-      }
-    }
-  }
-
-  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
-    "have a type mismatch in a metadata column") {
-    val tbl = "testCatalog.t"
-    val typeMismatchTbl = "typeMismatch.t"
-    withTable(tbl, typeMismatchTbl) {
-      spark.range(10).write.saveAsTable(tbl)
-      val df = spark.table(tbl)
-      spark.range(10).write.saveAsTable(typeMismatchTbl)
-      val typeMismatchDf = spark.table(typeMismatchTbl)
-      
assert(df.union(typeMismatchDf).queryExecution.analyzed.metadataOutput.isEmpty)
-    }
-  }
-
-  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
-    "have an attribute name mismatch in a metadata column") {
-    val tbl = "testCatalog.t"
-    val nameMismatchTbl = "nameMismatch.t"
-    withTable(tbl, nameMismatchTbl) {
-      spark.range(10).write.saveAsTable(tbl)
-      val df = spark.table(tbl)
-      spark.range(10).write.saveAsTable(nameMismatchTbl)
-      val nameMismatchDf = spark.table(nameMismatchTbl)
-      
assert(df.union(nameMismatchDf).queryExecution.analyzed.metadataOutput.isEmpty)
-    }
-  }
-
-  test("SPARK-41498: Metadata column is not propagated when children of Union 
" +
-    "have a field name mismatch in a metadata column") {
-    val tbl = "testCatalog.t"
-    val fieldNameMismatchTbl = "fieldNameMismatch.t"
-    withTable(tbl, fieldNameMismatchTbl) {
-      spark.range(10).write.saveAsTable(tbl)
-      val df = spark.table(tbl)
-      spark.range(10).write.saveAsTable(fieldNameMismatchTbl)
-      val fieldNameMismatchDf = spark.table(fieldNameMismatchTbl)
-      
assert(df.union(fieldNameMismatchDf).queryExecution.analyzed.metadataOutput.isEmpty)
-    }
-  }
-
-  test("SPARK-41538: Metadata column should be appended at the end of 
project") {
-    val tableName = "table_1"
-    val viewName = "view_1"
-    withTable(tableName) {
-      withView(viewName) {
-        sql(s"CREATE TABLE $tableName (a ARRAY<STRING>, s STRUCT<id: STRING>) 
USING parquet")
-        val id = "id1"
-        sql(s"INSERT INTO $tableName values(ARRAY('a'), named_struct('id', 
'$id'))")
-        sql(
-          s"""
-             |CREATE VIEW $viewName (id)
-             |AS WITH source AS (
-             |    SELECT * FROM $tableName
-             |),
-             |renamed AS (
-             |    SELECT s.id FROM source
-             |)
-             |SELECT id FROM renamed
-             |""".stripMargin)
-        val query =
-          s"""
-             |with foo AS (
-             |  SELECT '$id' as id
-             |),
-             |bar AS (
-             |  SELECT '$id' as id
-             |)
-             |SELECT
-             |  1
-             |FROM foo
-             |FULL OUTER JOIN bar USING(id)
-             |FULL OUTER JOIN $viewName USING(id)
-             |WHERE foo.id IS NOT NULL
-             |""".stripMargin
-        checkAnswer(sql(query), Row(1))
-      }
-    }
-  }
-
-  test("SPARK-42331: Fix metadata col can not been resolved") {
-    withTable(tbl) {
-      prepareTable()
-
-      checkAnswer(
-        spark.table(tbl).where("index = 0").select("index"),
-        Seq(Row(0), Row(0), Row(0)))
-      checkAnswer(
-        spark.table(tbl).where("index = 0").select("_partition"),
-        Seq(Row("3/1"), Row("0/2"), Row("1/3")))
-    }
-  }
-}
-
-class MetadataTestTable(
-    name: String,
-    schema: StructType,
-    partitioning: Array[Transform],
-    properties: java.util.Map[String, String])
-  extends InMemoryTable(name, schema, partitioning, properties) {
-
-  override val metadataColumns: Array[MetadataColumn] =
-    Array(
-      new MetadataColumn {
-        override def name: String = "_metadata"
-        override def dataType: DataType = StructType(StructField("index", 
IntegerType) :: Nil)
-        override def comment: String = ""
-      }
-    )
-}
-
-class TypeMismatchTable(
-    name: String,
-    schema: StructType,
-    partitioning: Array[Transform],
-    properties: java.util.Map[String, String])
-  extends InMemoryTable(name, schema, partitioning, properties) {
-
-  override val metadataColumns: Array[MetadataColumn] =
-    Array(
-      new MetadataColumn {
-        override def name: String = "_metadata"
-        override def dataType: DataType = StructType(StructField("index", 
StringType) :: Nil)
-        override def comment: String =
-          "Used to create a type mismatch with the metadata col in 
`MetadataTestTable`"
-      }
-    )
-}
-
-class AttrNameMismatchTable(
-    name: String,
-    schema: StructType,
-    partitioning: Array[Transform],
-    properties: java.util.Map[String, String])
-  extends InMemoryTable(name, schema, partitioning, properties) {
-  override val metadataColumns: Array[MetadataColumn] =
-    Array(
-      new MetadataColumn {
-        override def name: String = "wrongName"
-        override def dataType: DataType = StructType(StructField("index", 
IntegerType) :: Nil)
-        override def comment: String =
-          "Used to create a name mismatch with the metadata col in 
`MetadataTestTable`"
-      })
-}
-
-class FieldNameMismatchTable(
-    name: String,
-    schema: StructType,
-    partitioning: Array[Transform],
-    properties: java.util.Map[String, String])
-  extends InMemoryTable(name, schema, partitioning, properties) {
-  override val metadataColumns: Array[MetadataColumn] =
-    Array(
-      new MetadataColumn {
-        override def name: String = "_metadata"
-        override def dataType: DataType = StructType(StructField("wrongName", 
IntegerType) :: Nil)
-        override def comment: String =
-          "Used to create a name mismatch with the struct field in the 
metadata col of " +
-            "`MetadataTestTable`"
-      })
-}
-
-class MetadataTestCatalog extends InMemoryCatalog {
-  override def createTable(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: java.util.Map[String, String]): Table = {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-
-    val tableName = s"$name.${ident.quoted}"
-    val tbl = new MetadataTestTable(tableName, schema, partitions, properties)
-    tables.put(ident, tbl)
-    namespaces.putIfAbsent(ident.namespace.toList, Map())
-    tbl
-  }
-}
-
-class MetadataTypeMismatchCatalog extends InMemoryCatalog {
-  override def createTable(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: java.util.Map[String, String]): Table = {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-
-    val tableName = s"$name.${ident.quoted}"
-    val tbl = new TypeMismatchTable(tableName, schema, partitions, properties)
-    tables.put(ident, tbl)
-    namespaces.putIfAbsent(ident.namespace.toList, Map())
-    tbl
-  }
-}
-
-class MetadataAttrNameMismatchCatalog extends InMemoryCatalog {
-  override def createTable(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: java.util.Map[String, String]): Table = {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-
-    val tableName = s"$name.${ident.quoted}"
-    val tbl = new AttrNameMismatchTable(tableName, schema, partitions, 
properties)
-    tables.put(ident, tbl)
-    namespaces.putIfAbsent(ident.namespace.toList, Map())
-    tbl
-  }
-}
-
-class MetadataFieldNameMismatchCatalog extends InMemoryCatalog {
-  override def createTable(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: java.util.Map[String, String]): Table = {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-
-    val tableName = s"$name.${ident.quoted}"
-    val tbl = new FieldNameMismatchTable(tableName, schema, partitions, 
properties)
-    tables.put(ident, tbl)
-    namespaces.putIfAbsent(ident.namespace.toList, Map())
-    tbl
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to