This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 79ce792  [SPARK-30810][SQL] Parses and convert a CSV Dataset having 
different column from 'value' in csv(dataset) API
79ce792 is described below

commit 79ce79234f02092e22fdd79e859d83f5a174ef95
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Fri Feb 14 18:20:18 2020 +0800

    [SPARK-30810][SQL] Parses and convert a CSV Dataset having different column 
from 'value' in csv(dataset) API
    
    ### What changes were proposed in this pull request?
    
    This PR fixes `DataFrameReader.csv(dataset: Dataset[String])` API to take a 
`Dataset[String]` originated from a column name different from `value`. This is 
a long-standing bug started from the very first place.
    
    `CSVUtils.filterCommentAndEmpty` assumed the `Dataset[String]` to be 
originated with `value` column. This PR changes to use the first column name in 
the schema.
    
    ### Why are the changes needed?
    
    For  `DataFrameReader.csv(dataset: Dataset[String])` to support any 
`Dataset[String]` as the signature indicates.
    
    ### Does this PR introduce any user-facing change?
    Yes,
    
    ```scala
    val ds = spark.range(2).selectExpr("concat('a,b,', id) AS text").as[String]
    spark.read.option("header", true).option("inferSchema", true).csv(ds).show()
    ```
    
    Before:
    
    ```
    org.apache.spark.sql.AnalysisException: cannot resolve '`value`' given 
input columns: [text];;
    'Filter (length(trim('value, None)) > 0)
    +- Project [concat(a,b,, cast(id#0L as string)) AS text#2]
       +- Range (0, 2, step=1, splits=Some(2))
    ```
    
    After:
    
    ```
    +---+---+---+
    |  a|  b|  0|
    +---+---+---+
    |  a|  b|  1|
    +---+---+---+
    ```
    
    ### How was this patch tested?
    
    Unittest was added.
    
    Closes #27561 from HyukjinKwon/SPARK-30810.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 2a270a731a3b1da9a0fb036d648dd522e5c4d5ad)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala  | 7 ++++---
 .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala  | 7 +++++++
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
index 21fabac..d8b52c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
@@ -33,11 +33,12 @@ object CSVUtils {
     // with the one below, `filterCommentAndEmpty` but execution path is 
different. One of them
     // might have to be removed in the near future if possible.
     import lines.sqlContext.implicits._
-    val nonEmptyLines = lines.filter(length(trim($"value")) > 0)
+    val aliased = lines.toDF("value")
+    val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
     if (options.isCommentSet) {
-      nonEmptyLines.filter(!$"value".startsWith(options.comment.toString))
+      
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).as[String]
     } else {
-      nonEmptyLines
+      nonEmptyLines.as[String]
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b1105b4..0be0e1e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2294,6 +2294,13 @@ abstract class CSVSuite extends QueryTest with 
SharedSparkSession with TestCsvDa
       }
     }
   }
+
+  test("SPARK-30810: parses and convert a CSV Dataset having different column 
from 'value'") {
+    val ds = spark.range(2).selectExpr("concat('a,b,', id) AS 
`a.text`").as[String]
+    val csv = spark.read.option("header", true).option("inferSchema", 
true).csv(ds)
+    assert(csv.schema.fieldNames === Seq("a", "b", "0"))
+    checkAnswer(csv, Row("a", "b", 1))
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to