This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ea4dee [SPARK-26012][SQL] Null and '' values should not cause
dynamic partition failure of string types
5ea4dee is described below
commit 5ea4deec447fb413fd90b8c1a8d983d6bee89d91
Author: 10129659 <[email protected]>
AuthorDate: Wed Apr 10 19:54:19 2019 +0800
[SPARK-26012][SQL] Null and '' values should not cause dynamic partition
failure of string types
Dynamic partition will fail when both '' and null values are taken as
dynamic partition values simultaneously.
For example, the test bellow will fail before this PR:
test("Null and '' values should not cause dynamic partition failure of
string types") {
withTable("t1", "t2") {
spark.range(3).write.saveAsTable("t1")
spark.sql("select id, cast(case when id = 1 then '' else null end as
string) as p" +
" from t1").write.partitionBy("p").saveAsTable("t2")
checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null),
Row(2, null)))
}
}
The error is: 'org.apache.hadoop.fs.FileAlreadyExistsException: File
already exists'.
This PR convert the empty strings to null for partition values.
This is another way for PR(https://github.com/apache/spark/pull/23010)
(Please fill in changes proposed in this fix)
How was this patch tested?
New added test.
Closes #24334 from eatoncys/FileFormatWriter.
Authored-by: 10129659 <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/datasources/FileFormatWriter.scala | 36 +++++++++++++++++++---
.../datasources/FileFormatWriterSuite.scala | 19 +++++++++++-
2 files changed, 50 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index a9de649..f1fc5d7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -35,9 +35,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
-import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan,
SQLExecution}
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -49,6 +52,22 @@ object FileFormatWriter extends Logging {
customPartitionLocations: Map[TablePartitionSpec, String],
outputColumns: Seq[Attribute])
+ /** A function that converts the empty string to null for partition values.
*/
+ case class Empty2Null(child: Expression) extends UnaryExpression with
String2StringExpression {
+ override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0)
null else v
+ override def nullable: Boolean = true
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ nullSafeCodeGen(ctx, ev, c => {
+ s"""if ($c.numBytes() == 0) {
+ | ${ev.isNull} = true;
+ | ${ev.value} = null;
+ |} else {
+ | ${ev.value} = $c;
+ |}""".stripMargin
+ })
+ }
+ }
+
/**
* Basic work flow of this command is:
* 1. Driver side setup, including output committer initialization and data
source specific
@@ -84,6 +103,15 @@ object FileFormatWriter extends Logging {
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)
+ var needConvert = false
+ val projectList: Seq[NamedExpression] = plan.output.map {
+ case p if partitionSet.contains(p) && p.dataType == StringType &&
p.nullable =>
+ needConvert = true
+ Alias(Empty2Null(p), p.name)()
+ case attr => attr
+ }
+ val empty2NullPlan = if (needConvert) ProjectExec(projectList, plan) else
plan
+
val bucketIdExpression = bucketSpec.map { spec =>
val bucketColumns = spec.bucketColumnNames.map(c =>
dataColumns.find(_.name == c).get)
// Use `HashPartitioning.partitionIdExpression` as our bucket id
expression, so that we can
@@ -123,7 +151,7 @@ object FileFormatWriter extends Logging {
// We should first sort by partition columns, then bucket id, and finally
sorting columns.
val requiredOrdering = partitionColumns ++ bucketIdExpression ++
sortColumns
// the sort order doesn't matter
- val actualOrdering = plan.outputOrdering.map(_.child)
+ val actualOrdering = empty2NullPlan.outputOrdering.map(_.child)
val orderingMatched = if (requiredOrdering.length > actualOrdering.length)
{
false
} else {
@@ -141,7 +169,7 @@ object FileFormatWriter extends Logging {
try {
val rdd = if (orderingMatched) {
- plan.execute()
+ empty2NullPlan.execute()
} else {
// SPARK-21165: the `requiredOrdering` is based on the attributes from
analyzed plan, and
// the physical plan may have different attribute ids due to optimizer
removing some
@@ -151,7 +179,7 @@ object FileFormatWriter extends Logging {
SortExec(
orderingExpr,
global = false,
- child = plan).execute()
+ child = empty2NullPlan).execute()
}
// SPARK-23271 If we are attempting to write a zero partition rdd,
create a dummy single
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
index 13f0e0b..e09ec0d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
@@ -18,9 +18,14 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest
import org.apache.spark.sql.test.SharedSQLContext
-class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
+class FileFormatWriterSuite
+ extends QueryTest
+ with SharedSQLContext
+ with CodegenInterpretedPlanTest{
+
import testImplicits._
test("empty file should be skipped while write to file") {
@@ -44,4 +49,16 @@ class FileFormatWriterSuite extends QueryTest with
SharedSQLContext {
checkAnswer(spark.table("t4"), Row(0, 0))
}
}
+
+ test("Null and '' values should not cause dynamic partition failure of
string types") {
+ withTable("t1", "t2") {
+ Seq((0, None), (1, Some("")), (2, None)).toDF("id", "p")
+ .write.partitionBy("p").saveAsTable("t1")
+ checkAnswer(spark.table("t1").sort("id"), Seq(Row(0, null), Row(1,
null), Row(2, null)))
+
+ sql("create table t2(id long, p string) using parquet partitioned by
(p)")
+ sql("insert overwrite table t2 partition(p) select id, p from t1")
+ checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1,
null), Row(2, null)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]