This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 3336a21 [SPARK-26990][SQL][BACKPORT-2.4] FileIndex: use user specified field names if possible 3336a21 is described below commit 3336a2167ed76f0d0fdbf3e28a18fab8b2eac540 Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Thu Feb 28 09:37:07 2019 +0800 [SPARK-26990][SQL][BACKPORT-2.4] FileIndex: use user specified field names if possible ## What changes were proposed in this pull request? Back-port of #23894 to branch-2.4. WIth the following file structure: ``` /tmp/data └── a=5 ``` In the previous release: ``` scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema root |-- ID: long (nullable = true) |-- A: integer (nullable = true) ``` While in current code: ``` scala> spark.read.schema("A int, ID long").parquet("/tmp/data/").printSchema root |-- ID: long (nullable = true) |-- a: integer (nullable = true) ``` We can see that the partition column name `a` is different from `A` as user specifed. This PR is to fix the case and make it more user-friendly. Closes #23894 from gengliangwang/fileIndexSchema. Authored-by: Gengliang Wang <gengliang.wangdatabricks.com> Signed-off-by: Wenchen Fan <wenchendatabricks.com> ## How was this patch tested? Unit test Closes #23909 from bersprockets/backport-SPARK-26990. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/datasources/PartitioningUtils.scala | 11 ++++++++++- .../spark/sql/execution/datasources/FileIndexSuite.scala | 15 +++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 9d2c9ba..41b0bb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -119,6 +119,13 @@ object PartitioningUtils { Map.empty[String, DataType] } + // SPARK-26990: use user specified field names if case insensitive. + val userSpecifiedNames = if (userSpecifiedSchema.isDefined && !caseSensitive) { + CaseInsensitiveMap(userSpecifiedSchema.get.fields.map(f => f.name -> f.name).toMap) + } else { + Map.empty[String, String] + } + // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, timeZone) @@ -163,7 +170,9 @@ object PartitioningUtils { columnNames.zip(literals).map { case (name, Literal(_, dataType)) => // We always assume partition columns are nullable since we've no idea whether null values // will be appended in the future. - StructField(name, userSpecifiedDataTypes.getOrElse(name, dataType), nullable = true) + val resultName = userSpecifiedNames.getOrElse(name, name) + val resultDataType = userSpecifiedDataTypes.getOrElse(name, dataType) + StructField(resultName, resultDataType, nullable = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index e2ffe63..09c6894 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -65,6 +65,21 @@ class FileIndexSuite extends SharedSQLContext { } } + test("SPARK-26990: use user specified field names if possible") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val schema = StructType(Seq(StructField("A", StringType, false))) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, Some(schema)) + assert(fileIndex.partitionSchema.length == 1 && fileIndex.partitionSchema.head.name == "A") + } + } + } + test("InMemoryFileIndex: input paths are converted to qualified paths") { withTempDir { dir => val file = new File(dir, "text.txt") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org