This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea4dee  [SPARK-26012][SQL] Null and '' values should not cause 
dynamic partition failure of string types
5ea4dee is described below

commit 5ea4deec447fb413fd90b8c1a8d983d6bee89d91
Author: 10129659 <[email protected]>
AuthorDate: Wed Apr 10 19:54:19 2019 +0800

    [SPARK-26012][SQL] Null and '' values should not cause dynamic partition 
failure of string types
    
    Dynamic partition will fail when both '' and null values are taken as 
dynamic partition values simultaneously.
    For example, the test bellow will fail before this PR:
    
    test("Null and '' values should not cause dynamic partition failure of 
string types") {
    withTable("t1", "t2") {
    spark.range(3).write.saveAsTable("t1")
    spark.sql("select id, cast(case when id = 1 then '' else null end as 
string) as p" +
    " from t1").write.partitionBy("p").saveAsTable("t2")
    checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), 
Row(2, null)))
    }
    }
    
    The error is: 'org.apache.hadoop.fs.FileAlreadyExistsException: File 
already exists'.
    This PR convert the empty strings to null for partition values.
    This is another way for PR(https://github.com/apache/spark/pull/23010)
    
    (Please fill in changes proposed in this fix)
    
    How was this patch tested?
    New added test.
    
    Closes #24334 from eatoncys/FileFormatWriter.
    
    Authored-by: 10129659 <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/datasources/FileFormatWriter.scala   | 36 +++++++++++++++++++---
 .../datasources/FileFormatWriterSuite.scala        | 19 +++++++++++-
 2 files changed, 50 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index a9de649..f1fc5d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -35,9 +35,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
-import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, 
SQLExecution}
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 
@@ -49,6 +52,22 @@ object FileFormatWriter extends Logging {
       customPartitionLocations: Map[TablePartitionSpec, String],
       outputColumns: Seq[Attribute])
 
+  /** A function that converts the empty string to null for partition values. 
*/
+  case class Empty2Null(child: Expression) extends UnaryExpression with 
String2StringExpression {
+    override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) 
null else v
+    override def nullable: Boolean = true
+    override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+      nullSafeCodeGen(ctx, ev, c => {
+        s"""if ($c.numBytes() == 0) {
+           |  ${ev.isNull} = true;
+           |  ${ev.value} = null;
+           |} else {
+           |  ${ev.value} = $c;
+           |}""".stripMargin
+      })
+    }
+  }
+
   /**
    * Basic work flow of this command is:
    * 1. Driver side setup, including output committer initialization and data 
source specific
@@ -84,6 +103,15 @@ object FileFormatWriter extends Logging {
     val partitionSet = AttributeSet(partitionColumns)
     val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)
 
+    var needConvert = false
+    val projectList: Seq[NamedExpression] = plan.output.map {
+      case p if partitionSet.contains(p) && p.dataType == StringType && 
p.nullable =>
+        needConvert = true
+        Alias(Empty2Null(p), p.name)()
+      case attr => attr
+    }
+    val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else 
plan
+
     val bucketIdExpression = bucketSpec.map { spec =>
       val bucketColumns = spec.bucketColumnNames.map(c => 
dataColumns.find(_.name == c).get)
       // Use `HashPartitioning.partitionIdExpression` as our bucket id 
expression, so that we can
@@ -123,7 +151,7 @@ object FileFormatWriter extends Logging {
     // We should first sort by partition columns, then bucket id, and finally 
sorting columns.
     val requiredOrdering = partitionColumns ++ bucketIdExpression ++ 
sortColumns
     // the sort order doesn't matter
-    val actualOrdering = plan.outputOrdering.map(_.child)
+    val actualOrdering = empty2NullPlan.outputOrdering.map(_.child)
     val orderingMatched = if (requiredOrdering.length > actualOrdering.length) 
{
       false
     } else {
@@ -141,7 +169,7 @@ object FileFormatWriter extends Logging {
 
     try {
       val rdd = if (orderingMatched) {
-        plan.execute()
+        empty2NullPlan.execute()
       } else {
         // SPARK-21165: the `requiredOrdering` is based on the attributes from 
analyzed plan, and
         // the physical plan may have different attribute ids due to optimizer 
removing some
@@ -151,7 +179,7 @@ object FileFormatWriter extends Logging {
         SortExec(
           orderingExpr,
           global = false,
-          child = plan).execute()
+          child = empty2NullPlan).execute()
       }
 
       // SPARK-23271 If we are attempting to write a zero partition rdd, 
create a dummy single
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
index 13f0e0b..e09ec0d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
@@ -18,9 +18,14 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest
 import org.apache.spark.sql.test.SharedSQLContext
 
-class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
+class FileFormatWriterSuite
+  extends QueryTest
+  with SharedSQLContext
+  with CodegenInterpretedPlanTest{
+
   import testImplicits._
 
   test("empty file should be skipped while write to file") {
@@ -44,4 +49,16 @@ class FileFormatWriterSuite extends QueryTest with 
SharedSQLContext {
       checkAnswer(spark.table("t4"), Row(0, 0))
     }
   }
+
+  test("Null and '' values should not cause dynamic partition failure of 
string types") {
+    withTable("t1", "t2") {
+      Seq((0, None), (1, Some("")), (2, None)).toDF("id", "p")
+        .write.partitionBy("p").saveAsTable("t1")
+      checkAnswer(spark.table("t1").sort("id"), Seq(Row(0, null), Row(1, 
null), Row(2, null)))
+
+      sql("create table t2(id long, p string) using parquet partitioned by 
(p)")
+      sql("insert overwrite table t2 partition(p) select id, p from t1")
+      checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, 
null), Row(2, null)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to