This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 400dc7b [SPARK-36594][SQL] ORC vectorized reader should properly
check maximal number of fields
400dc7b is described below
commit 400dc7ba67622d65a7f17e27dfb20ba0427accb9
Author: Cheng Su <[email protected]>
AuthorDate: Thu Aug 26 19:42:25 2021 +0800
[SPARK-36594][SQL] ORC vectorized reader should properly check maximal
number of fields
### What changes were proposed in this pull request?
Debugged internally and found a bug where we should disable vectorized
reader now based on schema recursively. Currently we check `schema.length` to
be no more than `wholeStageMaxNumFields` to enable vectorization.
`schema.length` does not take nested columns sub-fields into condition (i.e.
view nested column same as primitive column). This check will be wrong when
enabling vectorization for nested columns. We should follow [same
check](https://github.com/apache/spark/blob/master/sql/ [...]
### Why are the changes needed?
Avoid OOM/performance regression when reading ORC table with nested column
types.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit test in `OrcQuerySuite.scala`. Verified test failed without this
change.
Closes #33842 from c21/field-fix.
Authored-by: Cheng Su <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../execution/datasources/orc/OrcFileFormat.scala | 3 ++-
.../v2/orc/OrcPartitionReaderFactory.scala | 3 ++-
.../execution/datasources/orc/OrcQuerySuite.scala | 26 ++++++++++++++++++++++
3 files changed, 30 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 79c54bc..c4ffdb4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -118,7 +119,7 @@ class OrcFileFormat
override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
val conf = sparkSession.sessionState.conf
conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
- schema.length <= conf.wholeStageMaxNumFields &&
+ !WholeStageCodegenExec.isTooManyFields(conf, schema) &&
schema.forall(s => OrcUtils.supportColumnarReads(
s.dataType,
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 930adc0..c5020cb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -30,6 +30,7 @@ import org.apache.orc.mapreduce.OrcInputFormat
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader,
OrcDeserializer, OrcFilters, OrcUtils}
import org.apache.spark.sql.execution.datasources.v2._
@@ -63,7 +64,7 @@ case class OrcPartitionReaderFactory(
override def supportColumnarReads(partition: InputPartition): Boolean = {
sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
- resultSchema.length <= sqlConf.wholeStageMaxNumFields &&
+ !WholeStageCodegenExec.isTooManyFields(sqlConf, resultSchema) &&
resultSchema.forall(s => OrcUtils.supportColumnarReads(
s.dataType, sqlConf.orcVectorizedReaderNestedColumnEnabled))
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 680c2cf..e4c33e96f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -742,6 +742,32 @@ abstract class OrcQuerySuite extends OrcQueryTest with
SharedSparkSession {
}
}
}
+
+ test("SPARK-36594: ORC vectorized reader should properly check maximal
number of fields") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val df = spark.range(10).map { x =>
+ val stringColumn = s"$x" * 10
+ val structColumn = (x, s"$x" * 100)
+ val arrayColumn = (0 until 5).map(i => (x + i, s"$x" * 5))
+ val mapColumn = Map(s"$x" -> (x * 0.1, (x, s"$x" * 100)))
+ (x, stringColumn, structColumn, arrayColumn, mapColumn)
+ }.toDF("int_col", "string_col", "struct_col", "array_col", "map_col")
+ df.write.format("orc").save(path)
+
+ Seq(("5", false), ("10", true)).foreach {
+ case (maxNumFields, vectorizedEnabled) =>
+ withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
-> "true",
+ SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> maxNumFields) {
+ val scanPlan = spark.read.orc(path).queryExecution.executedPlan
+ assert(scanPlan.find {
+ case scan @ (_: FileSourceScanExec | _: BatchScanExec) =>
scan.supportsColumnar
+ case _ => false
+ }.isDefined == vectorizedEnabled)
+ }
+ }
+ }
+ }
}
class OrcV1QuerySuite extends OrcQuerySuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]