This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new c21303f [SPARK-36594][SQL][3.2] ORC vectorized reader should properly
check maximal number of fields
c21303f is described below
commit c21303f02c582e97fefc130415e739ddda8dd43e
Author: Cheng Su <[email protected]>
AuthorDate: Thu Aug 26 14:55:21 2021 +0800
[SPARK-36594][SQL][3.2] ORC vectorized reader should properly check maximal
number of fields
### What changes were proposed in this pull request?
This is the patch on branch-3.2 for
https://github.com/apache/spark/pull/33842. See the description in the other PR.
### Why are the changes needed?
Avoid OOM/performance regression when reading ORC table with nested column
types.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit test in `OrcSourceSuite.scala`.
Closes #33843 from c21/branch-3.2.
Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/datasources/orc/OrcFileFormat.scala | 3 ++-
.../execution/datasources/orc/OrcSourceSuite.scala | 26 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 9251d33..5b08f51 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -132,7 +133,7 @@ class OrcFileFormat
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
val conf = sparkSession.sessionState.conf
conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
- schema.length <= conf.wholeStageMaxNumFields &&
+ !WholeStageCodegenExec.isTooManyFields(conf, schema) &&
schema.forall(s => supportDataType(s.dataType) &&
!s.dataType.isInstanceOf[UserDefinedType[_]]) &&
supportBatchForNestedColumn(sparkSession, schema)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 9acf59c..348ef6f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -659,4 +659,30 @@ class OrcSourceSuite extends OrcSuite with
SharedSparkSession {
checkAnswer(spark.sql("SELECT _col0, _col2.c1 FROM t1"), Seq(Row(1,
"a")))
}
}
+
+ test("SPARK-36594: ORC vectorized reader should properly check maximal
number of fields") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = spark.range(10).map { x =>
+ val stringColumn = s"$x" * 10
+ val structColumn = (x, s"$x" * 100)
+ val arrayColumn = (0 until 5).map(i => (x + i, s"$x" * 5))
+ val mapColumn = Map(s"$x" -> (x * 0.1, (x, s"$x" * 100)))
+ (x, stringColumn, structColumn, arrayColumn, mapColumn)
+ }.toDF("int_col", "string_col", "struct_col", "array_col", "map_col")
+ df.write.format("orc").save(path)
+
+ Seq(("5", false), ("10", true)).foreach {
+ case (maxNumFields, vectorizedEnabled) =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
-> "true",
+ SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> maxNumFields) {
+ val scanPlan = spark.read.orc(path).queryExecution.executedPlan
+ assert(scanPlan.find {
+ case scan: FileSourceScanExec => scan.supportsColumnar
+ case _ => false
+ }.isDefined == vectorizedEnabled)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]