This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c0bb6b  [SPARK-26384][SQL] Propagate SQL configs for CSV schema 
inferring
3c0bb6b is described below

commit 3c0bb6bc45e64fd82052d7857f2a06c34f0c1793
Author: Maxim Gekk <maxim.g...@databricks.com>
AuthorDate: Wed Dec 19 00:01:53 2018 +0800

    [SPARK-26384][SQL] Propagate SQL configs for CSV schema inferring
    
    ## What changes were proposed in this pull request?
    
    Currently, SQL configs are not propagated to executors while schema 
inferring in CSV datasource. For example, changing of 
`spark.sql.legacy.timeParser.enabled` does not impact on inferring timestamp 
types. In the PR, I propose to fix the issue by wrapping schema inferring 
action using `SQLExecution.withSQLConfPropagated`.
    
    ## How was this patch tested?
    
    Added logging to `TimestampFormatter`:
    ```patch
    -object TimestampFormatter {
    +object TimestampFormatter extends Logging {
       def apply(format: String, timeZone: TimeZone, locale: Locale): 
TimestampFormatter = {
         if (SQLConf.get.legacyTimeParserEnabled) {
    +      logError("LegacyFallbackTimestampFormatter is being used")
           new LegacyFallbackTimestampFormatter(format, timeZone, locale)
         } else {
    +      logError("Iso8601TimestampFormatter is being used")
           new Iso8601TimestampFormatter(format, timeZone, locale)
         }
       }
    ```
    and run the command in `spark-shell`:
    ```shell
    $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true
    ```
    ```scala
    scala> 
Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo")
    scala> spark.read.option("inferSchema", "true").option("header", 
"false").option("timestampFormat", "yyyy|MM|dd").csv("/tmp/foo").printSchema()
    18/12/18 10:47:27 ERROR TimestampFormatter: 
LegacyFallbackTimestampFormatter is being used
    root
     |-- _c0: timestamp (nullable = true)
    ```
    
    Closes #23345 from MaxGekk/csv-schema-infer-propagate-configs.
    
    Authored-by: Maxim Gekk <maxim.g...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/execution/datasources/csv/CSVDataSource.scala      | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index b46dfb9..375cec5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -35,6 +35,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
 import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVInferSchema, 
CSVOptions, UnivocityParser}
+import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.types.StructType
@@ -135,7 +136,9 @@ object TextInputCSVDataSource extends CSVDataSource {
           val parser = new CsvParser(parsedOptions.asParserSettings)
           linesWithoutHeader.map(parser.parseLine)
         }
-        new CSVInferSchema(parsedOptions).infer(tokenRDD, header)
+        SQLExecution.withSQLConfPropagated(csv.sparkSession) {
+          new CSVInferSchema(parsedOptions).infer(tokenRDD, header)
+        }
       case _ =>
         // If the first line could not be read, just return the empty schema.
         StructType(Nil)
@@ -208,7 +211,9 @@ object MultiLineCSVDataSource extends CSVDataSource {
             encoding = parsedOptions.charset)
         }
         val sampled = CSVUtils.sample(tokenRDD, parsedOptions)
-        new CSVInferSchema(parsedOptions).infer(sampled, header)
+        SQLExecution.withSQLConfPropagated(sparkSession) {
+          new CSVInferSchema(parsedOptions).infer(sampled, header)
+        }
       case None =>
         // If the first row could not be read, just return the empty schema.
         StructType(Nil)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to