This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new da1f95b [SPARK-31116][SQL] Fix nested schema case-sensitivity in
ParquetRowConverter
da1f95b is described below
commit da1f95be6b9af59a91a14e01613bdc4e8ac35374
Author: Tae-kyeom, Kim <[email protected]>
AuthorDate: Mon Mar 16 10:31:56 2020 -0700
[SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter
### What changes were proposed in this pull request?
This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so
that it handle materialize parquet properly with respect to case sensitivity
### Why are the changes needed?
From spark 3.0.0, below statement throws IllegalArgumentException in
caseInsensitive mode because of explicit field index searching in
ParquetRowConverter. As we already constructed parquet requested schema and
catalyst requested schema during schema clipping in ParquetReadSupport, just
follow these behavior.
```scala
val path = "/some/temp/path"
spark
.range(1L)
.selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS
StructColumn")
.write.parquet(path)
val caseInsensitiveSchema = new StructType()
.add(
"StructColumn",
new StructType()
.add("LowerCase", LongType)
.add("camelcase", LongType))
spark.read.schema(caseInsensitiveSchema).parquet(path).show()
```
### Does this PR introduce any user-facing change?
No. The changes are only in unreleased branches (`master` and `branch-3.0`).
### How was this patch tested?
Passed new test cases that check parquet column selection with respect to
schemas and case sensitivities
Closes #27888 from kimtkyeom/parquet_row_converter_case_sensitivity.
Authored-by: Tae-kyeom, Kim <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit e736c62764137b2c3af90d2dc8a77e391891200a)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../datasources/parquet/ParquetRowConverter.scala | 12 +++++--
.../spark/sql/FileBasedDataSourceSuite.scala | 40 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 850adae..22422c0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -33,8 +33,9 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils,
GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData,
CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -178,8 +179,15 @@ private[parquet] class ParquetRowConverter(
// Converters for each field.
private[this] val fieldConverters: Array[Converter with
HasParentContainerUpdater] = {
+ // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is
false
+ // to prevent throwing IllegalArgumentException when searching catalyst
type's field index
+ val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
+ catalystType.fieldNames.zipWithIndex.toMap
+ } else {
+ CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+ }
parquetType.getFields.asScala.map { parquetField =>
- val fieldIndex = catalystType.fieldIndex(parquetField.getName)
+ val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
val catalystField = catalystType(fieldIndex)
// Converted field value should be set to the `fieldIndex`-th cell of
`currentRow`
newConverter(parquetField, catalystField.dataType, new
RowUpdater(currentRow, fieldIndex))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index c870958..cb410b4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -842,6 +842,46 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
}
+
+ test("SPARK-31116: Select nested schema with case insensitive mode") {
+ // This test case failed at only Parquet. ORC is added for test coverage
parity.
+ Seq("orc", "parquet").foreach { format =>
+ Seq("true", "false").foreach { nestedSchemaPruningEnabled =>
+ withSQLConf(
+ SQLConf.CASE_SENSITIVE.key -> "false",
+ SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key ->
nestedSchemaPruningEnabled) {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ // Prepare values for testing nested parquet data
+ spark
+ .range(1L)
+ .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1)
AS StructColumn")
+ .write
+ .format(format)
+ .save(path)
+
+ val exactSchema = "StructColumn struct<lowercase: LONG, camelCase:
LONG>"
+
+
checkAnswer(spark.read.schema(exactSchema).format(format).load(path),
Row(Row(0, 1)))
+
+ // In case insensitive manner, parquet's column cases are ignored
+ val innerColumnCaseInsensitiveSchema =
+ "StructColumn struct<Lowercase: LONG, camelcase: LONG>"
+ checkAnswer(
+
spark.read.schema(innerColumnCaseInsensitiveSchema).format(format).load(path),
+ Row(Row(0, 1)))
+
+ val rootColumnCaseInsensitiveSchema =
+ "structColumn struct<lowercase: LONG, camelCase: LONG>"
+ checkAnswer(
+
spark.read.schema(rootColumnCaseInsensitiveSchema).format(format).load(path),
+ Row(Row(0, 1)))
+ }
+ }
+ }
+ }
+ }
}
object TestingUDT {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]