Repository: spark
Updated Branches:
  refs/heads/master 95e51ff84 -> 5c9eaa6b5


[SPARK-23372][SQL] Writing empty struct in parquet fails during execution. It 
should fail earlier in the processing.

## What changes were proposed in this pull request?
Currently we allow writing data frames with empty schema into a file based 
datasource for certain file formats such as JSON, ORC etc. For formats such as 
Parquet and Text, we raise error at different times of execution. For text 
format, we return error from the driver early on in processing where as for 
format such as parquet, the error is raised from executor.

**Example**
spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path)
**Results in**
``` SQL
org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an 
empty group: message spark_schema {
 }

at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
 at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
 at org.apache.parquet.schema.MessageType.accept(MessageType.java:58)
 at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
 at 
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:225)
 at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
 at 
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:376)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:387)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:278)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:276)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:281)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:206)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:205)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:109)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.
```

In this PR, we unify the error processing and raise error on attempt to write 
empty schema based dataframes into file based datasource (orc, parquet, text , 
csv, json etc) early on in the processing.

## How was this patch tested?

Unit tests added in FileBasedDatasourceSuite.

Author: Dilip Biswal <[email protected]>

Closes #20579 from dilipbiswal/spark-23372.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c9eaa6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c9eaa6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c9eaa6b

Branch: refs/heads/master
Commit: 5c9eaa6b585e9febd782da8eb6490b24d0d39ff3
Parents: 95e51ff
Author: Dilip Biswal <[email protected]>
Authored: Wed Mar 21 21:49:02 2018 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Wed Mar 21 21:49:02 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  1 +
 .../sql/execution/datasources/DataSource.scala  | 26 +++++++++++++++++-
 .../spark/sql/FileBasedDataSourceSuite.scala    | 28 ++++++++++++++++++++
 3 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5c9eaa6b/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 5b47fd7..421e2ea 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1807,6 +1807,7 @@ working with timestamps in `pandas_udf`s to get the best 
performance, see
   - In PySpark, when Arrow optimization is enabled, previously `toPandas` just 
failed when Arrow optimization is unabled to be used whereas `createDataFrame` 
from Pandas DataFrame allowed the fallback to non-optimization. Now, both 
`toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by 
default, which can be switched off by 
`spark.sql.execution.arrow.fallback.enabled`.
  - Since Spark 2.4, writing an empty dataframe to a directory launches at 
least one write task, even if physically the dataframe has no partition. This 
introduces a small behavior change that for self-describing file formats like 
Parquet and Orc, Spark creates a metadata-only file in the target directory 
when writing a 0-partition dataframe, so that schema inference can still work 
if users read that directory later. The new behavior is more reasonable and 
more consistent regarding writing empty dataframe.
  - Since Spark 2.4, expression IDs in UDF arguments do not appear in column 
names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` 
but ``UDF:f(col0 AS `colA`)``.
+ - Since Spark 2.4, writing a dataframe with an empty or nested empty schema 
using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An 
exception is thrown when attempting to write dataframes with empty schema. 
 
 ## Upgrading From Spark SQL 2.2 to 2.3
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5c9eaa6b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 35fcff6..31fa89b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -45,7 +45,7 @@ import 
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
+import org.apache.spark.sql.types.{CalendarIntervalType, StructField, 
StructType}
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.util.Utils
 
@@ -546,6 +546,7 @@ case class DataSource(
       case dataSource: CreatableRelationProvider =>
         SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions, 
mode)
       case format: FileFormat =>
+        DataSource.validateSchema(data.schema)
         planForWritingFileFormat(format, mode, data)
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")
@@ -719,4 +720,27 @@ object DataSource extends Logging {
     }
     globPath
   }
+
+  /**
+   * Called before writing into a FileFormat based data source to make sure the
+   * supplied schema is not empty.
+   * @param schema
+   */
+  private def validateSchema(schema: StructType): Unit = {
+    def hasEmptySchema(schema: StructType): Boolean = {
+      schema.size == 0 || schema.find {
+        case StructField(_, b: StructType, _, _) => hasEmptySchema(b)
+        case _ => false
+      }.isDefined
+    }
+
+
+    if (hasEmptySchema(schema)) {
+      throw new AnalysisException(
+        s"""
+           |Datasource does not support writing empty or nested empty schemas.
+           |Please make sure the data schema has at least one or more 
column(s).
+         """.stripMargin)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5c9eaa6b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index bd3071b..0630309 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 
 
 class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterAll {
@@ -108,6 +109,33 @@ class FileBasedDataSourceSuite extends QueryTest with 
SharedSQLContext with Befo
   }
 
   allFileBasedDataSources.foreach { format =>
+    test(s"SPARK-23372 error while writing empty schema files using $format") {
+      withTempPath { outputPath =>
+        val errMsg = intercept[AnalysisException] {
+          spark.emptyDataFrame.write.format(format).save(outputPath.toString)
+        }
+        assert(errMsg.getMessage.contains(
+          "Datasource does not support writing empty or nested empty schemas"))
+      }
+
+      // Nested empty schema
+      withTempPath { outputPath =>
+        val schema = StructType(Seq(
+          StructField("a", IntegerType),
+          StructField("b", StructType(Nil)),
+          StructField("c", IntegerType)
+        ))
+        val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
+        val errMsg = intercept[AnalysisException] {
+          df.write.format(format).save(outputPath.toString)
+        }
+        assert(errMsg.getMessage.contains(
+          "Datasource does not support writing empty or nested empty schemas"))
+      }
+    }
+  }
+
+  allFileBasedDataSources.foreach { format =>
     test(s"SPARK-22146 read files containing special characters using 
$format") {
       withTempDir { dir =>
         val tmpFile = s"$dir/$nameWithSpecialChars"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to