Repository: spark
Updated Branches:
  refs/heads/master bfc5569a5 -> 5a140b784


[SPARK-26263][SQL] Validate partition values with user provided schema

## What changes were proposed in this pull request?

Currently if user provides data schema, partition column values are converted 
as per it. But if the conversion failed, e.g. converting string to int, the 
column value is null.

This PR proposes to throw exception in such case, instead of converting into 
null value silently:
1. These null partition column values doesn't make sense to users in most 
cases. It is better to show the conversion failure, and then users can adjust 
the schema or ETL jobs to fix it.
2. There are always exceptions on such conversion failure for non-partition 
data columns. Partition columns should have the same behavior.

We can reproduce the case above as following:
```
/tmp/testDir
├── p=bar
└── p=foo
```
If we run:
```
val schema = StructType(Seq(StructField("p", IntegerType, false)))
spark.read.schema(schema).csv("/tmp/testDir/").show()
```
We will get:
```
+----+
|   p|
+----+
|null|
|null|
+----+
```

## How was this patch tested?

Unit test

Closes #23215 from gengliangwang/SPARK-26263.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a140b78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a140b78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a140b78

Branch: refs/heads/master
Commit: 5a140b7844936cf2b65f08853b8cfd8c499d4f13
Parents: bfc5569
Author: Gengliang Wang <[email protected]>
Authored: Fri Dec 7 11:13:14 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Fri Dec 7 11:13:14 2018 +0800

----------------------------------------------------------------------
 docs/sql-migration-guide-upgrade.md             |  2 ++
 .../org/apache/spark/sql/internal/SQLConf.scala | 12 ++++++++
 .../PartitioningAwareFileIndex.scala            |  4 +--
 .../datasources/PartitioningUtils.scala         | 30 ++++++++++++++------
 .../execution/datasources/FileIndexSuite.scala  | 27 +++++++++++++++++-
 .../ParquetPartitionDiscoverySuite.scala        | 18 +++++++++---
 6 files changed, 77 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a140b78/docs/sql-migration-guide-upgrade.md
----------------------------------------------------------------------
diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index ed2ff13..3638b08 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -29,6 +29,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, users can create a map with duplicated 
keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior 
of map with duplicated keys is undefined, e.g. map look up respects the 
duplicated key appears first, `Dataset.collect` only keeps the duplicated key 
appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these 
built-in functions will remove duplicated map keys with last wins policy. Users 
may still read map values with duplicated keys from data sources which do not 
enforce it (e.g. Parquet), the behavior will be udefined.
 
+  - In Spark version 2.4 and earlier, partition column value is converted as 
null if it can't be casted to corresponding user provided schema. Since 3.0, 
partition column value is validated with user provided schema. An exception is 
thrown if the validation fails. You can disable such validation by setting 
`spark.sql.sources.validatePartitionColumns` to `false`.
+
   - In Spark version 2.4 and earlier, the `SET` command works without any 
warnings even if the specified key is for `SparkConf` entries and it has no 
effect because the command does not update `SparkConf`, but the behavior might 
confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You 
can disable such a check by setting 
`spark.sql.legacy.execution.setCommandRejectsSparkConfs` to `false`.
 
   - Spark applications which are built with Spark version 2.4 and prior, and 
call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, 
as they are not binary compatible with Spark 3.0.

http://git-wip-us.apache.org/repos/asf/spark/blob/5a140b78/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 451b051..6857b8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1396,6 +1396,16 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val VALIDATE_PARTITION_COLUMNS =
+    buildConf("spark.sql.sources.validatePartitionColumns")
+      .internal()
+      .doc("When this option is set to true, partition column values will be 
validated with " +
+        "user-specified schema. If the validation fails, a runtime exception 
is thrown." +
+        "When this option is set to false, the partition column value will be 
converted to null " +
+        "if it can not be casted to corresponding user-specified schema.")
+      .booleanConf
+      .createWithDefault(true)
+
   val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE =
     buildConf("spark.sql.streaming.continuous.executorQueueSize")
     .internal()
@@ -2014,6 +2024,8 @@ class SQLConf extends Serializable with Logging {
   def allowCreatingManagedTableUsingNonemptyLocation: Boolean =
     getConf(ALLOW_CREATING_MANAGED_TABLE_USING_NONEMPTY_LOCATION)
 
+  def validatePartitionColumns: Boolean = getConf(VALIDATE_PARTITION_COLUMNS)
+
   def partitionOverwriteMode: PartitionOverwriteMode.Value =
     PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a140b78/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 7b0e4db..b2e4155 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -127,13 +127,13 @@ abstract class PartitioningAwareFileIndex(
     val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
       .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
 
-    val caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis
     PartitioningUtils.parsePartitions(
       leafDirs,
       typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
       basePaths = basePaths,
       userSpecifiedSchema = userSpecifiedSchema,
-      caseSensitive = caseSensitive,
+      caseSensitive = sparkSession.sqlContext.conf.caseSensitiveAnalysis,
+      validatePartitionColumns = 
sparkSession.sqlContext.conf.validatePartitionColumns,
       timeZoneId = timeZoneId)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a140b78/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index d66cb09..6458b65 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -26,12 +26,13 @@ import scala.util.Try
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 
@@ -96,9 +97,10 @@ object PartitioningUtils {
       basePaths: Set[Path],
       userSpecifiedSchema: Option[StructType],
       caseSensitive: Boolean,
+      validatePartitionColumns: Boolean,
       timeZoneId: String): PartitionSpec = {
-    parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema,
-      caseSensitive, DateTimeUtils.getTimeZone(timeZoneId))
+    parsePartitions(paths, typeInference, basePaths, userSpecifiedSchema, 
caseSensitive,
+      validatePartitionColumns, DateTimeUtils.getTimeZone(timeZoneId))
   }
 
   private[datasources] def parsePartitions(
@@ -107,6 +109,7 @@ object PartitioningUtils {
       basePaths: Set[Path],
       userSpecifiedSchema: Option[StructType],
       caseSensitive: Boolean,
+      validatePartitionColumns: Boolean,
       timeZone: TimeZone): PartitionSpec = {
     val userSpecifiedDataTypes = if (userSpecifiedSchema.isDefined) {
       val nameToDataType = userSpecifiedSchema.get.fields.map(f => f.name -> 
f.dataType).toMap
@@ -121,7 +124,8 @@ object PartitioningUtils {
 
     // First, we need to parse every partition's path and see if we can find 
partition values.
     val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
-      parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, 
timeZone)
+      parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes,
+        validatePartitionColumns, timeZone)
     }.unzip
 
     // We create pairs of (path -> path's partition value) here
@@ -203,6 +207,7 @@ object PartitioningUtils {
       typeInference: Boolean,
       basePaths: Set[Path],
       userSpecifiedDataTypes: Map[String, DataType],
+      validatePartitionColumns: Boolean,
       timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = {
     val columns = ArrayBuffer.empty[(String, Literal)]
     // Old Hadoop versions don't have `Path.isRoot`
@@ -224,7 +229,8 @@ object PartitioningUtils {
         // Let's say currentPath is a path of "/table/a=1/", 
currentPath.getName will give us a=1.
         // Once we get the string, we try to parse it and find the partition 
column and value.
         val maybeColumn =
-          parsePartitionColumn(currentPath.getName, typeInference, 
userSpecifiedDataTypes, timeZone)
+          parsePartitionColumn(currentPath.getName, typeInference, 
userSpecifiedDataTypes,
+            validatePartitionColumns, timeZone)
         maybeColumn.foreach(columns += _)
 
         // Now, we determine if we should stop.
@@ -258,6 +264,7 @@ object PartitioningUtils {
       columnSpec: String,
       typeInference: Boolean,
       userSpecifiedDataTypes: Map[String, DataType],
+      validatePartitionColumns: Boolean,
       timeZone: TimeZone): Option[(String, Literal)] = {
     val equalSignIndex = columnSpec.indexOf('=')
     if (equalSignIndex == -1) {
@@ -272,10 +279,15 @@ object PartitioningUtils {
       val literal = if (userSpecifiedDataTypes.contains(columnName)) {
         // SPARK-26188: if user provides corresponding column schema, get the 
column value without
         //              inference, and then cast it as user specified data 
type.
-        val columnValue = inferPartitionColumnValue(rawColumnValue, false, 
timeZone)
-        val castedValue =
-          Cast(columnValue, userSpecifiedDataTypes(columnName), 
Option(timeZone.getID)).eval()
-        Literal.create(castedValue, userSpecifiedDataTypes(columnName))
+        val dataType = userSpecifiedDataTypes(columnName)
+        val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, 
false, timeZone)
+        val columnValue = columnValueLiteral.eval()
+        val castedValue = Cast(columnValueLiteral, dataType, 
Option(timeZone.getID)).eval()
+        if (validatePartitionColumns && columnValue != null && castedValue == 
null) {
+          throw new RuntimeException(s"Failed to cast value `$columnValue` to 
`$dataType` " +
+            s"for partition column `$columnName`")
+        }
+        Literal.create(castedValue, dataType)
       } else {
         inferPartitionColumnValue(rawColumnValue, typeInference, timeZone)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a140b78/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index ec552f7..6bd0a25 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
 import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}
 
 class FileIndexSuite extends SharedSQLContext {
@@ -95,6 +95,31 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
+  test("SPARK-26263: Throw exception when partition value can't be casted to 
user-specified type") {
+    withTempDir { dir =>
+      val partitionDirectory = new File(dir, "a=foo")
+      partitionDirectory.mkdir()
+      val file = new File(partitionDirectory, "text.txt")
+      stringToFile(file, "text")
+      val path = new Path(dir.getCanonicalPath)
+      val schema = StructType(Seq(StructField("a", IntegerType, false)))
+      withSQLConf(SQLConf.VALIDATE_PARTITION_COLUMNS.key -> "true") {
+        val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, 
Some(schema))
+        val msg = intercept[RuntimeException] {
+          fileIndex.partitionSpec()
+        }.getMessage
+        assert(msg == "Failed to cast value `foo` to `IntegerType` for 
partition column `a`")
+      }
+
+      withSQLConf(SQLConf.VALIDATE_PARTITION_COLUMNS.key -> "false") {
+        val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, 
Some(schema))
+        val partitionValues = 
fileIndex.partitionSpec().partitions.map(_.values)
+        assert(partitionValues.length == 1 && partitionValues(0).numFields == 
1 &&
+          partitionValues(0).isNullAt(0))
+      }
+    }
+  }
+
   test("InMemoryFileIndex: input paths are converted to qualified paths") {
     withTempDir { dir =>
       val file = new File(dir, "text.txt")

http://git-wip-us.apache.org/repos/asf/spark/blob/5a140b78/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index f808ca4..8806735 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -101,7 +101,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       "hdfs://host:9000/path/a=10.5/b=hello")
 
     var exception = intercept[AssertionError] {
-      parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, 
true, timeZoneId)
+      parsePartitions(paths.map(new Path(_)), true, Set.empty[Path], None, 
true, true, timeZoneId)
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
 
@@ -117,6 +117,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       Set(new Path("hdfs://host:9000/path/")),
       None,
       true,
+      true,
       timeZoneId)
 
     // Valid
@@ -132,6 +133,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       Set(new Path("hdfs://host:9000/path/something=true/table")),
       None,
       true,
+      true,
       timeZoneId)
 
     // Valid
@@ -147,6 +149,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       Set(new Path("hdfs://host:9000/path/table=true")),
       None,
       true,
+      true,
       timeZoneId)
 
     // Invalid
@@ -162,6 +165,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
         Set(new Path("hdfs://host:9000/path/")),
         None,
         true,
+        true,
         timeZoneId)
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
@@ -184,6 +188,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
         Set(new Path("hdfs://host:9000/tmp/tables/")),
         None,
         true,
+        true,
         timeZoneId)
     }
     assert(exception.getMessage().contains("Conflicting directory structures 
detected"))
@@ -191,13 +196,14 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
 
   test("parse partition") {
     def check(path: String, expected: Option[PartitionValues]): Unit = {
-      val actual = parsePartition(new Path(path), true, Set.empty[Path], 
Map.empty, timeZone)._1
+      val actual = parsePartition(new Path(path), true, Set.empty[Path],
+        Map.empty, true, timeZone)._1
       assert(expected === actual)
     }
 
     def checkThrows[T <: Throwable: Manifest](path: String, expected: String): 
Unit = {
       val message = intercept[T] {
-        parsePartition(new Path(path), true, Set.empty[Path], Map.empty, 
timeZone)
+        parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, 
timeZone)
       }.getMessage
 
       assert(message.contains(expected))
@@ -242,6 +248,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       typeInference = true,
       basePaths = Set(new Path("file://path/a=10")),
       Map.empty,
+      true,
       timeZone = timeZone)._1
 
     assert(partitionSpec1.isEmpty)
@@ -252,6 +259,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
       typeInference = true,
       basePaths = Set(new Path("file://path")),
       Map.empty,
+      true,
       timeZone = timeZone)._1
 
     assert(partitionSpec2 ==
@@ -272,6 +280,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
           rootPaths,
           None,
           true,
+          true,
           timeZoneId)
       assert(actualSpec.partitionColumns === spec.partitionColumns)
       assert(actualSpec.partitions.length === spec.partitions.length)
@@ -384,7 +393,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with 
ParquetTest with Sha
   test("parse partitions with type inference disabled") {
     def check(paths: Seq[String], spec: PartitionSpec): Unit = {
       val actualSpec =
-        parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None, 
true, timeZoneId)
+        parsePartitions(paths.map(new Path(_)), false, Set.empty[Path], None,
+          true, true, timeZoneId)
       assert(actualSpec === spec)
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to