This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f2492ddbfe9d [SPARK-47546][SQL] Improve validation when reading 
Variant from Parquet
f2492ddbfe9d is described below

commit f2492ddbfe9d8a788f720025f79a1ef2025f9662
Author: cashmand <[email protected]>
AuthorDate: Thu Mar 28 11:37:36 2024 +0800

    [SPARK-47546][SQL] Improve validation when reading Variant from Parquet
    
    ### What changes were proposed in this pull request?
    
    The parquet schema converter for Variant assumes that the first two fields 
in a parquet struct are the value and metadata for Variant, respectively. This 
isn't very robust, and could break if other writers wrote them in a different 
order, or if a future extension added new fields to the Variant structure. This 
PR adds more careful validation of the Variant column based on field names, and 
fails if there are any unexpected columns.
    
    ### Why are the changes needed?
    
    Make Variant more robust to issues in the parquet schema.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, Variant isn't released yet.
    
    ### How was this patch tested?
    
    Added a unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45703 from cashmand/SPARK-47546.
    
    Authored-by: cashmand <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../parquet/ParquetSchemaConverter.scala           | 34 ++++++++++--
 .../scala/org/apache/spark/sql/VariantSuite.scala  | 61 +++++++++++++++++++++-
 2 files changed, 90 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 5a1da83912b4..32c092a9fd6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -179,10 +179,7 @@ class ParquetToSparkSchemaConverter(
     field match {
       case primitiveColumn: PrimitiveColumnIO => 
convertPrimitiveField(primitiveColumn, targetType)
       case groupColumn: GroupColumnIO if targetType.contains(VariantType) =>
-        ParquetColumn(VariantType, groupColumn, Seq(
-          convertField(groupColumn.getChild(0), Some(BinaryType)),
-          convertField(groupColumn.getChild(1), Some(BinaryType))
-        ))
+        convertVariantField(groupColumn)
       case groupColumn: GroupColumnIO => convertGroupField(groupColumn, 
targetType)
     }
   }
@@ -404,6 +401,35 @@ class ParquetToSparkSchemaConverter(
     }
   }
 
+  private def convertVariantField(groupColumn: GroupColumnIO): ParquetColumn = 
{
+    if (groupColumn.getChildrenCount != 2) {
+      // We may allow more than two children in the future, so consider this 
unsupported.
+      throw QueryCompilationErrors.
+        parquetTypeUnsupportedYetError("variant with more than two fields")
+    }
+    // Find the binary columns, and validate that they have the correct type.
+    val valueAndMetadata = Seq("value", "metadata").map { colName =>
+      val idx = (0 until groupColumn.getChildrenCount)
+          .find(groupColumn.getChild(_).getName == colName)
+      if (idx.isEmpty) {
+        throw QueryCompilationErrors.illegalParquetTypeError(s"variant missing 
$colName field")
+      }
+      val child = groupColumn.getChild(idx.get)
+      // The value and metadata cannot be individually null, only the full 
struct can.
+      if (child.getType.getRepetition != REQUIRED ||
+          !child.isInstanceOf[PrimitiveColumnIO] ||
+          child.asInstanceOf[PrimitiveColumnIO].getPrimitive != BINARY) {
+        throw QueryCompilationErrors.illegalParquetTypeError(
+          s"variant $colName must be a non-nullable binary")
+      }
+      child
+    }
+    ParquetColumn(VariantType, groupColumn, Seq(
+      convertField(valueAndMetadata(0), Some(BinaryType)),
+      convertField(valueAndMetadata(1), Some(BinaryType))
+    ))
+  }
+
   // scalastyle:off
   // Here we implement Parquet LIST backwards-compatibility rules.
   // See: 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
index 3991b44d0bbb..f95111f9f91b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
@@ -25,7 +25,7 @@ import scala.util.Random
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{StructField, StructType, VariantType}
 import org.apache.spark.unsafe.types.VariantVal
 import org.apache.spark.util.ArrayImplicits._
 
@@ -192,4 +192,63 @@ class VariantSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-47546: invalid variant binary") {
+    // Write a struct-of-binary that looks like a Variant, but with minor 
variations that may make
+    // it invalid to read.
+    // Test cases:
+    // 1) A binary that is almost correct, but contains an extra field "paths"
+    // 2,3) A binary with incorrect field names
+    // 4) Incorrect data typea
+    // 5,6) Nullable value or metdata
+
+    // Binary value of empty metadata
+    val m = "X'010000'"
+    // Binary value of a literal "false"
+    val v = "X'8'"
+    val cases = Seq(
+        s"named_struct('value', $v, 'metadata', $m, 'paths', $v)",
+        s"named_struct('value', $v, 'dictionary', $m)",
+        s"named_struct('val', $v, 'metadata', $m)",
+        s"named_struct('value', 8, 'metadata', $m)",
+        s"named_struct('value', cast(null as binary), 'metadata', $m)",
+        s"named_struct('value', $v, 'metadata', cast(null as binary))"
+    )
+    cases.foreach { structDef =>
+      withTempDir { dir =>
+        val file = new File(dir, "dir").getCanonicalPath
+        val df = spark.sql(s"select $structDef as v from range(10)")
+        df.write.parquet(file)
+        val schema = StructType(Seq(StructField("v", VariantType)))
+        val result = 
spark.read.schema(schema).parquet(file).selectExpr("to_json(v)")
+        val e = intercept[org.apache.spark.SparkException](result.collect())
+        assert(e.getCause.isInstanceOf[AnalysisException], e.printStackTrace)
+      }
+    }
+  }
+
+  test("SPARK-47546: valid variant binary") {
+    // Test valid struct-of-binary formats. We don't expect anybody to 
construct a Variant in this
+    // way, but it lets us validate slight variations that could be produced 
by a different writer.
+
+    // Binary value of empty metadata
+    val m = "X'010000'"
+    // Binary value of a literal "false"
+    val v = "X'8'"
+    val cases = Seq(
+        s"named_struct('value', $v, 'metadata', $m)",
+        s"named_struct('metadata', $m, 'value', $v)"
+    )
+    cases.foreach { structDef =>
+      withTempDir { dir =>
+        val file = new File(dir, "dir").getCanonicalPath
+        val df = spark.sql(s"select $structDef as v from range(10)")
+        df.write.parquet(file)
+        val schema = StructType(Seq(StructField("v", VariantType)))
+        val result = spark.read.schema(schema).parquet(file)
+            .selectExpr("to_json(v)")
+        checkAnswer(result, Seq.fill(10)(Row("false")))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to