Repository: spark
Updated Branches:
  refs/heads/master 9a09e91a3 -> 31c4fab3f


[SPARK-26081][SQL] Prevent empty files for empty partitions in Text datasources

## What changes were proposed in this pull request?

In the PR, I propose to postpone creation of 
`OutputStream`/`Univocity`/`JacksonGenerator` till the first row should be 
written. This prevents creation of empty files for empty partitions. So, no 
need to open and to read such files back while loading data from the location.

## How was this patch tested?

Added tests for Text, JSON and CSV datasource where empty dataset is written 
but should not produce any files.

Closes #23052 from MaxGekk/text-empty-files.

Lead-authored-by: Maxim Gekk <max.g...@gmail.com>
Co-authored-by: Maxim Gekk <maxim.g...@databricks.com>
Signed-off-by: Sean Owen <sean.o...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31c4fab3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31c4fab3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31c4fab3

Branch: refs/heads/master
Commit: 31c4fab3fb0343edf971de9070a319c6b3094647
Parents: 9a09e91
Author: Maxim Gekk <max.g...@gmail.com>
Authored: Thu Nov 29 10:31:31 2018 -0600
Committer: Sean Owen <sean.o...@databricks.com>
Committed: Thu Nov 29 10:31:31 2018 -0600

----------------------------------------------------------------------
 .../datasources/csv/CSVFileFormat.scala         | 20 +++++++++++++-------
 .../datasources/json/JsonFileFormat.scala       | 19 ++++++++++---------
 .../datasources/text/TextFileFormat.scala       | 16 ++++++++++++----
 .../execution/datasources/csv/CSVSuite.scala    |  9 +++++++++
 .../execution/datasources/json/JsonSuite.scala  | 13 +++++++++++--
 .../execution/datasources/text/TextSuite.scala  |  9 +++++++++
 6 files changed, 64 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index ff1911d..4c5a1d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -169,13 +169,19 @@ private[csv] class CsvOutputWriter(
     context: TaskAttemptContext,
     params: CSVOptions) extends OutputWriter with Logging {
 
-  private val charset = Charset.forName(params.charset)
-
-  private val writer = CodecStreams.createOutputStreamWriter(context, new 
Path(path), charset)
-
-  private val gen = new UnivocityGenerator(dataSchema, writer, params)
+  private var univocityGenerator: Option[UnivocityGenerator] = None
+
+  override def write(row: InternalRow): Unit = {
+    val gen = univocityGenerator.getOrElse {
+      val charset = Charset.forName(params.charset)
+      val os = CodecStreams.createOutputStreamWriter(context, new Path(path), 
charset)
+      val newGen = new UnivocityGenerator(dataSchema, os, params)
+      univocityGenerator = Some(newGen)
+      newGen
+    }
 
-  override def write(row: InternalRow): Unit = gen.write(row)
+    gen.write(row)
+  }
 
-  override def close(): Unit = gen.close()
+  override def close(): Unit = univocityGenerator.map(_.close())
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 610f0d1..3042133 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -175,19 +175,20 @@ private[json] class JsonOutputWriter(
          " which can be read back by Spark only if multiLine is enabled.")
   }
 
-  private val writer = CodecStreams.createOutputStreamWriter(
-    context, new Path(path), encoding)
-
-  // create the Generator without separator inserted between 2 records
-  private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
+  private var jacksonGenerator: Option[JacksonGenerator] = None
 
   override def write(row: InternalRow): Unit = {
+    val gen = jacksonGenerator.getOrElse {
+      val os = CodecStreams.createOutputStreamWriter(context, new Path(path), 
encoding)
+      // create the Generator without separator inserted between 2 records
+      val newGen = new JacksonGenerator(dataSchema, os, options)
+      jacksonGenerator = Some(newGen)
+      newGen
+    }
+
     gen.write(row)
     gen.writeLineEnding()
   }
 
-  override def close(): Unit = {
-    gen.close()
-    writer.close()
-  }
+  override def close(): Unit = jacksonGenerator.map(_.close())
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 2682971..01948ab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.text
 
+import java.io.OutputStream
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -148,17 +150,23 @@ class TextOutputWriter(
     context: TaskAttemptContext)
   extends OutputWriter {
 
-  private val writer = CodecStreams.createOutputStream(context, new Path(path))
+  private var outputStream: Option[OutputStream] = None
 
   override def write(row: InternalRow): Unit = {
+    val os = outputStream.getOrElse{
+      val newStream = CodecStreams.createOutputStream(context, new Path(path))
+      outputStream = Some(newStream)
+      newStream
+    }
+
     if (!row.isNullAt(0)) {
       val utf8string = row.getUTF8String(0)
-      utf8string.writeTo(writer)
+      utf8string.writeTo(os)
     }
-    writer.write(lineSeparator)
+    os.write(lineSeparator)
   }
 
   override def close(): Unit = {
-    writer.close()
+    outputStream.map(_.close())
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index c275d63..e14e8d4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1986,4 +1986,13 @@ class CSVSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils with Te
     }.getMessage
     assert(errMsg2.contains("'lineSep' can contain only 1 character"))
   }
+
+  test("do not produce empty files for empty partitions") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.emptyDataset[String].write.csv(path)
+      val files = new File(path).listFiles()
+      assert(!files.exists(_.getName.endsWith("csv")))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index ee31077..ee5176e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1897,7 +1897,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
         .text(path)
 
       val jsonDF = spark.read.option("multiLine", true).option("mode", 
"PERMISSIVE").json(path)
-      assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty 
file
+      assert(jsonDF.count() === corruptRecordCount)
       assert(jsonDF.schema === new StructType()
         .add("_corrupt_record", StringType)
         .add("dummy", StringType))
@@ -1910,7 +1910,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
           F.count($"dummy").as("valid"),
           F.count($"_corrupt_record").as("corrupt"),
           F.count("*").as("count"))
-      checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
+      checkAnswer(counts, Row(1, 4, 6)) // null row for empty file
     }
   }
 
@@ -2555,4 +2555,13 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     emptyString(StringType, "")
     emptyString(BinaryType, "".getBytes(StandardCharsets.UTF_8))
   }
+
+  test("do not produce empty files for empty partitions") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.emptyDataset[String].write.json(path)
+      val files = new File(path).listFiles()
+      assert(!files.exists(_.getName.endsWith("json")))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/31c4fab3/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 0e7f3af..a86d5ee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -233,4 +233,13 @@ class TextSuite extends QueryTest with SharedSQLContext {
     assert(data(3) == Row("\"doh\""))
     assert(data.length == 4)
   }
+
+  test("do not produce empty files for empty partitions") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.emptyDataset[String].write.text(path)
+      val files = new File(path).listFiles()
+      assert(!files.exists(_.getName.endsWith("txt")))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to