Repository: spark
Updated Branches:
refs/heads/master 95e51ff84 -> 5c9eaa6b5
[SPARK-23372][SQL] Writing empty struct in parquet fails during execution. It
should fail earlier in the processing.
## What changes were proposed in this pull request?
Currently we allow writing data frames with empty schema into a file based
datasource for certain file formats such as JSON, ORC etc. For formats such as
Parquet and Text, we raise error at different times of execution. For text
format, we return error from the driver early on in processing where as for
format such as parquet, the error is raised from executor.
**Example**
spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path)
**Results in**
``` SQL
org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an
empty group: message spark_schema {
}
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)
at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)
at org.apache.parquet.schema.MessageType.accept(MessageType.java:58)
at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)
at
org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:225)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:376)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:387)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:278)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:276)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:281)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:206)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.
```
In this PR, we unify the error processing and raise error on attempt to write
empty schema based dataframes into file based datasource (orc, parquet, text ,
csv, json etc) early on in the processing.
## How was this patch tested?
Unit tests added in FileBasedDatasourceSuite.
Author: Dilip Biswal <[email protected]>
Closes #20579 from dilipbiswal/spark-23372.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c9eaa6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c9eaa6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c9eaa6b
Branch: refs/heads/master
Commit: 5c9eaa6b585e9febd782da8eb6490b24d0d39ff3
Parents: 95e51ff
Author: Dilip Biswal <[email protected]>
Authored: Wed Mar 21 21:49:02 2018 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Wed Mar 21 21:49:02 2018 -0700
----------------------------------------------------------------------
docs/sql-programming-guide.md | 1 +
.../sql/execution/datasources/DataSource.scala | 26 +++++++++++++++++-
.../spark/sql/FileBasedDataSourceSuite.scala | 28 ++++++++++++++++++++
3 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5c9eaa6b/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 5b47fd7..421e2ea 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1807,6 +1807,7 @@ working with timestamps in `pandas_udf`s to get the best
performance, see
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just
failed when Arrow optimization is unabled to be used whereas `createDataFrame`
from Pandas DataFrame allowed the fallback to non-optimization. Now, both
`toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by
default, which can be switched off by
`spark.sql.execution.arrow.fallback.enabled`.
- Since Spark 2.4, writing an empty dataframe to a directory launches at
least one write task, even if physically the dataframe has no partition. This
introduces a small behavior change that for self-describing file formats like
Parquet and Orc, Spark creates a metadata-only file in the target directory
when writing a 0-partition dataframe, so that schema inference can still work
if users read that directory later. The new behavior is more reasonable and
more consistent regarding writing empty dataframe.
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column
names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)`
but ``UDF:f(col0 AS `colA`)``.
+ - Since Spark 2.4, writing a dataframe with an empty or nested empty schema
using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An
exception is thrown when attempting to write dataframes with empty schema.
## Upgrading From Spark SQL 2.2 to 2.3
http://git-wip-us.apache.org/repos/asf/spark/blob/5c9eaa6b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 35fcff6..31fa89b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -45,7 +45,7 @@ import
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
+import org.apache.spark.sql.types.{CalendarIntervalType, StructField,
StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
@@ -546,6 +546,7 @@ case class DataSource(
case dataSource: CreatableRelationProvider =>
SaveIntoDataSourceCommand(data, dataSource, caseInsensitiveOptions,
mode)
case format: FileFormat =>
+ DataSource.validateSchema(data.schema)
planForWritingFileFormat(format, mode, data)
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create
table as select.")
@@ -719,4 +720,27 @@ object DataSource extends Logging {
}
globPath
}
+
+ /**
+ * Called before writing into a FileFormat based data source to make sure the
+ * supplied schema is not empty.
+ * @param schema
+ */
+ private def validateSchema(schema: StructType): Unit = {
+ def hasEmptySchema(schema: StructType): Boolean = {
+ schema.size == 0 || schema.find {
+ case StructField(_, b: StructType, _, _) => hasEmptySchema(b)
+ case _ => false
+ }.isDefined
+ }
+
+
+ if (hasEmptySchema(schema)) {
+ throw new AnalysisException(
+ s"""
+ |Datasource does not support writing empty or nested empty schemas.
+ |Please make sure the data schema has at least one or more
column(s).
+ """.stripMargin)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/5c9eaa6b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index bd3071b..0630309 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with
BeforeAndAfterAll {
@@ -108,6 +109,33 @@ class FileBasedDataSourceSuite extends QueryTest with
SharedSQLContext with Befo
}
allFileBasedDataSources.foreach { format =>
+ test(s"SPARK-23372 error while writing empty schema files using $format") {
+ withTempPath { outputPath =>
+ val errMsg = intercept[AnalysisException] {
+ spark.emptyDataFrame.write.format(format).save(outputPath.toString)
+ }
+ assert(errMsg.getMessage.contains(
+ "Datasource does not support writing empty or nested empty schemas"))
+ }
+
+ // Nested empty schema
+ withTempPath { outputPath =>
+ val schema = StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", StructType(Nil)),
+ StructField("c", IntegerType)
+ ))
+ val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
+ val errMsg = intercept[AnalysisException] {
+ df.write.format(format).save(outputPath.toString)
+ }
+ assert(errMsg.getMessage.contains(
+ "Datasource does not support writing empty or nested empty schemas"))
+ }
+ }
+ }
+
+ allFileBasedDataSources.foreach { format =>
test(s"SPARK-22146 read files containing special characters using
$format") {
withTempDir { dir =>
val tmpFile = s"$dir/$nameWithSpecialChars"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]