This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 400dc7b  [SPARK-36594][SQL] ORC vectorized reader should properly 
check maximal number of fields
400dc7b is described below

commit 400dc7ba67622d65a7f17e27dfb20ba0427accb9
Author: Cheng Su <[email protected]>
AuthorDate: Thu Aug 26 19:42:25 2021 +0800

    [SPARK-36594][SQL] ORC vectorized reader should properly check maximal 
number of fields
    
    ### What changes were proposed in this pull request?
    
    Debugged internally and found a bug where we should disable vectorized 
reader now based on schema recursively. Currently we check `schema.length` to 
be no more than `wholeStageMaxNumFields` to enable vectorization. 
`schema.length` does not take nested columns sub-fields into condition (i.e. 
view nested column same as primitive column). This check will be wrong when 
enabling vectorization for nested columns. We should follow [same 
check](https://github.com/apache/spark/blob/master/sql/ [...]
    
    ### Why are the changes needed?
    
    Avoid OOM/performance regression when reading ORC table with nested column 
types.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added unit test in `OrcQuerySuite.scala`. Verified test failed without this 
change.
    
    Closes #33842 from c21/field-fix.
    
    Authored-by: Cheng Su <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../execution/datasources/orc/OrcFileFormat.scala  |  3 ++-
 .../v2/orc/OrcPartitionReaderFactory.scala         |  3 ++-
 .../execution/datasources/orc/OrcQuerySuite.scala  | 26 ++++++++++++++++++++++
 3 files changed, 30 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 79c54bc..c4ffdb4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -118,7 +119,7 @@ class OrcFileFormat
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     val conf = sparkSession.sessionState.conf
     conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
-      schema.length <= conf.wholeStageMaxNumFields &&
+      !WholeStageCodegenExec.isTooManyFields(conf, schema) &&
       schema.forall(s => OrcUtils.supportColumnarReads(
         s.dataType, 
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 930adc0..c5020cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -30,6 +30,7 @@ import org.apache.orc.mapreduce.OrcInputFormat
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, 
OrcDeserializer, OrcFilters, OrcUtils}
 import org.apache.spark.sql.execution.datasources.v2._
@@ -63,7 +64,7 @@ case class OrcPartitionReaderFactory(
 
   override def supportColumnarReads(partition: InputPartition): Boolean = {
     sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
-      resultSchema.length <= sqlConf.wholeStageMaxNumFields &&
+      !WholeStageCodegenExec.isTooManyFields(sqlConf, resultSchema) &&
       resultSchema.forall(s => OrcUtils.supportColumnarReads(
         s.dataType, sqlConf.orcVectorizedReaderNestedColumnEnabled))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 680c2cf..e4c33e96f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -742,6 +742,32 @@ abstract class OrcQuerySuite extends OrcQueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-36594: ORC vectorized reader should properly check maximal 
number of fields") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = spark.range(10).map { x =>
+        val stringColumn = s"$x" * 10
+        val structColumn = (x, s"$x" * 100)
+        val arrayColumn = (0 until 5).map(i => (x + i, s"$x" * 5))
+        val mapColumn = Map(s"$x" -> (x * 0.1, (x, s"$x" * 100)))
+        (x, stringColumn, structColumn, arrayColumn, mapColumn)
+      }.toDF("int_col", "string_col", "struct_col", "array_col", "map_col")
+      df.write.format("orc").save(path)
+
+      Seq(("5", false), ("10", true)).foreach {
+        case (maxNumFields, vectorizedEnabled) =>
+          withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key 
-> "true",
+            SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> maxNumFields) {
+            val scanPlan = spark.read.orc(path).queryExecution.executedPlan
+            assert(scanPlan.find {
+              case scan @ (_: FileSourceScanExec | _: BatchScanExec) => 
scan.supportsColumnar
+              case _ => false
+            }.isDefined == vectorizedEnabled)
+          }
+      }
+    }
+  }
 }
 
 class OrcV1QuerySuite extends OrcQuerySuite {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to