This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 3336a21  [SPARK-26990][SQL][BACKPORT-2.4] FileIndex: use user 
specified field names if possible
3336a21 is described below

commit 3336a2167ed76f0d0fdbf3e28a18fab8b2eac540
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Thu Feb 28 09:37:07 2019 +0800

    [SPARK-26990][SQL][BACKPORT-2.4] FileIndex: use user specified field names 
if possible
    
    ## What changes were proposed in this pull request?
    
    Back-port of #23894 to branch-2.4.
    
    WIth the following file structure:
    ```
    /tmp/data
    └── a=5
    ```
    
    In the previous release:
    ```
    scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema
    root
     |-- ID: long (nullable = true)
     |-- A: integer (nullable = true)
    ```
    
    While in current code:
    ```
    scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema
    root
     |-- ID: long (nullable = true)
     |-- a: integer (nullable = true)
    ```
    
    We can see that the partition column name `a` is different from `A` as user 
specifed. This PR is to fix the case and make it more user-friendly.
    
    Closes #23894 from gengliangwang/fileIndexSchema.
    
    Authored-by: Gengliang Wang <gengliang.wangdatabricks.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #23909 from bersprockets/backport-SPARK-26990.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/execution/datasources/PartitioningUtils.scala     | 11 ++++++++++-
 .../spark/sql/execution/datasources/FileIndexSuite.scala  | 15 +++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 9d2c9ba..41b0bb6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -119,6 +119,13 @@ object PartitioningUtils {
       Map.empty[String, DataType]
     }
 
+    // SPARK-26990: use user specified field names if case insensitive.
+    val userSpecifiedNames = if (userSpecifiedSchema.isDefined && 
!caseSensitive) {
+      CaseInsensitiveMap(userSpecifiedSchema.get.fields.map(f => f.name -> 
f.name).toMap)
+    } else {
+      Map.empty[String, String]
+    }
+
     // First, we need to parse every partition's path and see if we can find 
partition values.
     val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
       parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, 
timeZone)
@@ -163,7 +170,9 @@ object PartitioningUtils {
         columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
           // We always assume partition columns are nullable since we've no 
idea whether null values
           // will be appended in the future.
-          StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), 
nullable = true)
+          val resultName = userSpecifiedNames.getOrElse(name, name)
+          val resultDataType = userSpecifiedDataTypes.getOrElse(name, dataType)
+          StructField(resultName, resultDataType, nullable = true)
         }
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index e2ffe63..09c6894 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -65,6 +65,21 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
+  test("SPARK-26990: use user specified field names if possible") {
+    withTempDir { dir =>
+      val partitionDirectory = new File(dir, "a=foo")
+      partitionDirectory.mkdir()
+      val file = new File(partitionDirectory, "text.txt")
+      stringToFile(file, "text")
+      val path = new Path(dir.getCanonicalPath)
+      val schema = StructType(Seq(StructField("A", StringType, false)))
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+        val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, 
Some(schema))
+        assert(fileIndex.partitionSchema.length == 1 && 
fileIndex.partitionSchema.head.name == "A")
+      }
+    }
+  }
+
   test("InMemoryFileIndex: input paths are converted to qualified paths") {
     withTempDir { dir =>
       val file = new File(dir, "text.txt")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to