This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new faa6198  [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan 
when not used
faa6198 is described below

commit faa61980c475f48b83694501d3c86e1709a595da
Author: Douglas R Colkitt <douglas.colk...@gmail.com>
AuthorDate: Sat Feb 23 14:00:57 2019 -0600

    [SPARK-26935][SQL] Skip DataFrameReader's CSV first line scan when not used
    
    Prior to this patch, all DataFrameReader.csv() calls would collect the first
    line from the CSV input iterator. This is done to allow schema inference 
from the
    header row.
    
    However when schema is already specified this is a wasteful operation. It 
results
    in an unncessary compute step on the first partition. This can be expensive 
if
    the CSV itself is expensive to generate (e.g. it's the product of a 
long-running
    external pipe()).
    
    This patch short-circuits the first-line collection in 
DataFrameReader.csv() when
    schema is specified. Thereby improving CSV read performance in certain 
cases.
    
    ## What changes were proposed in this pull request?
    
    Short-circuiting DataFrameReader.csv() first-line read when schema is 
user-specified.
    
    ## How was this patch tested?
    
    Compiled and tested against several CSV datasets.
    
    Closes #23830 from Mister-Meeseeks/master.
    
    Authored-by: Douglas R Colkitt <douglas.colk...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/DataFrameReader.scala  | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e757785..ff295b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -508,7 +508,19 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
       sparkSession.sessionState.conf.sessionLocalTimeZone)
     val filteredLines: Dataset[String] =
       CSVUtils.filterCommentAndEmpty(csvDataset, parsedOptions)
-    val maybeFirstLine: Option[String] = filteredLines.take(1).headOption
+
+    // For performance, short-circuit the collection of the first line when it 
won't be used:
+    //   - TextInputCSVDataSource - Only uses firstLine to infer an 
unspecified schema
+    //   - CSVHeaderChecker       - Only uses firstLine to check header, when 
headerFlag is true
+    //   - CSVUtils               - Only uses firstLine to filter headers, 
when headerFlag is true
+    // (If the downstream logic grows more complicated, consider refactoring 
to an approach that
+    //  delegates this decision to the constituent consumers themselves.)
+    val maybeFirstLine: Option[String] =
+      if (userSpecifiedSchema.isEmpty || parsedOptions.headerFlag) {
+        filteredLines.take(1).headOption
+      } else {
+        None
+      }
 
     val schema = userSpecifiedSchema.getOrElse {
       TextInputCSVDataSource.inferFromDataset(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to