Repository: spark Updated Branches: refs/heads/master 9a09e91a3 -> 31c4fab3f
[SPARK-26081][SQL] Prevent empty files for empty partitions in Text datasources ## What changes were proposed in this pull request? In the PR, I propose to postpone creation of `OutputStream`/`Univocity`/`JacksonGenerator` till the first row should be written. This prevents creation of empty files for empty partitions. So, no need to open and to read such files back while loading data from the location. ## How was this patch tested? Added tests for Text, JSON and CSV datasource where empty dataset is written but should not produce any files. Closes #23052 from MaxGekk/text-empty-files. Lead-authored-by: Maxim Gekk <max.g...@gmail.com> Co-authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31c4fab3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31c4fab3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31c4fab3 Branch: refs/heads/master Commit: 31c4fab3fb0343edf971de9070a319c6b3094647 Parents: 9a09e91 Author: Maxim Gekk <max.g...@gmail.com> Authored: Thu Nov 29 10:31:31 2018 -0600 Committer: Sean Owen <sean.o...@databricks.com> Committed: Thu Nov 29 10:31:31 2018 -0600 ---------------------------------------------------------------------- .../datasources/csv/CSVFileFormat.scala | 20 +++++++++++++------- .../datasources/json/JsonFileFormat.scala | 19 ++++++++++--------- .../datasources/text/TextFileFormat.scala | 16 ++++++++++++---- .../execution/datasources/csv/CSVSuite.scala | 9 +++++++++ .../execution/datasources/json/JsonSuite.scala | 13 +++++++++++-- .../execution/datasources/text/TextSuite.scala | 9 +++++++++ 6 files changed, 64 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index ff1911d..4c5a1d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -169,13 +169,19 @@ private[csv] class CsvOutputWriter( context: TaskAttemptContext, params: CSVOptions) extends OutputWriter with Logging { - private val charset = Charset.forName(params.charset) - - private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) - - private val gen = new UnivocityGenerator(dataSchema, writer, params) + private var univocityGenerator: Option[UnivocityGenerator] = None + + override def write(row: InternalRow): Unit = { + val gen = univocityGenerator.getOrElse { + val charset = Charset.forName(params.charset) + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset) + val newGen = new UnivocityGenerator(dataSchema, os, params) + univocityGenerator = Some(newGen) + newGen + } - override def write(row: InternalRow): Unit = gen.write(row) + gen.write(row) + } - override def close(): Unit = gen.close() + override def close(): Unit = univocityGenerator.map(_.close()) } http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 610f0d1..3042133 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -175,19 +175,20 @@ private[json] class JsonOutputWriter( " which can be read back by Spark only if multiLine is enabled.") } - private val writer = CodecStreams.createOutputStreamWriter( - context, new Path(path), encoding) - - // create the Generator without separator inserted between 2 records - private[this] val gen = new JacksonGenerator(dataSchema, writer, options) + private var jacksonGenerator: Option[JacksonGenerator] = None override def write(row: InternalRow): Unit = { + val gen = jacksonGenerator.getOrElse { + val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding) + // create the Generator without separator inserted between 2 records + val newGen = new JacksonGenerator(dataSchema, os, options) + jacksonGenerator = Some(newGen) + newGen + } + gen.write(row) gen.writeLineEnding() } - override def close(): Unit = { - gen.close() - writer.close() - } + override def close(): Unit = jacksonGenerator.map(_.close()) } http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 2682971..01948ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.text +import java.io.OutputStream + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -148,17 +150,23 @@ class TextOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private val writer = CodecStreams.createOutputStream(context, new Path(path)) + private var outputStream: Option[OutputStream] = None override def write(row: InternalRow): Unit = { + val os = outputStream.getOrElse{ + val newStream = CodecStreams.createOutputStream(context, new Path(path)) + outputStream = Some(newStream) + newStream + } + if (!row.isNullAt(0)) { val utf8string = row.getUTF8String(0) - utf8string.writeTo(writer) + utf8string.writeTo(os) } - writer.write(lineSeparator) + os.write(lineSeparator) } override def close(): Unit = { - writer.close() + outputStream.map(_.close()) } } http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c275d63..e14e8d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1986,4 +1986,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te }.getMessage assert(errMsg2.contains("'lineSep' can contain only 1 character")) } + + test("do not produce empty files for empty partitions") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.csv(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("csv"))) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index ee31077..ee5176e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1897,7 +1897,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .text(path) val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path) - assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file + assert(jsonDF.count() === corruptRecordCount) assert(jsonDF.schema === new StructType() .add("_corrupt_record", StringType) .add("dummy", StringType)) @@ -1910,7 +1910,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { F.count($"dummy").as("valid"), F.count($"_corrupt_record").as("corrupt"), F.count("*").as("count")) - checkAnswer(counts, Row(1, 5, 7)) // null row for empty file + checkAnswer(counts, Row(1, 4, 6)) // null row for empty file } } @@ -2555,4 +2555,13 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { emptyString(StringType, "") emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8)) } + + test("do not produce empty files for empty partitions") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.json(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("json"))) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 0e7f3af..a86d5ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -233,4 +233,13 @@ class TextSuite extends QueryTest with SharedSQLContext { assert(data(3) == Row("\"doh\"")) assert(data.length == 4) } + + test("do not produce empty files for empty partitions") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.emptyDataset[String].write.text(path) + val files = new File(path).listFiles() + assert(!files.exists(_.getName.endsWith("txt"))) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org