This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 51e1428c2 [GLUTEN-4796][VL] Force fallback for orc char type scan 
(#4797)
51e1428c2 is described below

commit 51e1428c28e4b8a77befa9890e0ec40fed9553e7
Author: Kerwin Zhang <[email protected]>
AuthorDate: Mon Mar 18 14:03:05 2024 +0800

    [GLUTEN-4796][VL] Force fallback for orc char type scan (#4797)
    
    
    The issue resolved in #4259 dealt with the exception that occurred when a 
char column was included in the select statement. This issue fixes the 
exception that occurs when the char column is not included in the select 
statement.
    
    Fixes issue: #4796
---
 .../backendsapi/velox/VeloxBackend.scala           | 18 ++++--
 .../execution/BasicScanExecTransformer.scala       | 32 +++++++++--
 .../hive/execution/GlutenHiveSQLQuerySuite.scala   | 66 +++++++++++++++++++++-
 .../main/scala/io/glutenproject/GlutenConfig.scala | 10 ++++
 4 files changed, 113 insertions(+), 13 deletions(-)

diff --git 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
index 22a0f4ebc..12a76a3d2 100644
--- 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
@@ -102,9 +102,7 @@ object BackendSettings extends BackendSettingsApi {
       case StructField(_, structType: StructType, _, _) =>
         structType.simpleString + " is forced to fallback."
       case StructField(_, stringType: StringType, _, metadata)
-          if CharVarcharUtils
-            .getRawTypeString(metadata)
-            .getOrElse(stringType.catalogString) != stringType.catalogString =>
+          if isCharType(stringType, metadata) =>
         CharVarcharUtils.getRawTypeString(metadata) + " not support"
       case StructField(_, TimestampType, _, _) => "TimestampType not support"
     }
@@ -151,9 +149,7 @@ object BackendSettings extends BackendSettingsApi {
                 if mapType.valueType.isInstanceOf[ArrayType] =>
               "ArrayType as Value in MapType"
             case StructField(_, stringType: StringType, _, metadata)
-                if CharVarcharUtils
-                  .getRawTypeString(metadata)
-                  .getOrElse(stringType.catalogString) != 
stringType.catalogString =>
+                if isCharType(stringType, metadata) =>
               CharVarcharUtils.getRawTypeString(metadata) + " not support"
             case StructField(_, TimestampType, _, _) => "TimestampType not 
support"
           }
@@ -167,6 +163,16 @@ object BackendSettings extends BackendSettingsApi {
     }
   }
 
+  def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
+    val charTypePattern = "char\\((\\d+)\\)".r
+    GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled && charTypePattern
+      .findFirstIn(
+        CharVarcharUtils
+          .getRawTypeString(metadata)
+          .getOrElse(stringType.catalogString))
+      .isDefined
+  }
+
   override def supportWriteFilesExec(
       format: FileFormat,
       fields: Array[StructField],
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
 
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
index 2be1a162c..997395c0c 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
@@ -28,7 +28,8 @@ import 
io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
+import org.apache.spark.sql.types.{BooleanType, StringType, StructField, 
StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import com.google.common.collect.Lists
@@ -91,12 +92,20 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   }
 
   override protected def doValidateInternal(): ValidationResult = {
+    var fields = schema.fields
+
+    this match {
+      case transformer: FileSourceScanExecTransformer =>
+        fields = appendStringFields(transformer.relation.schema, fields)
+      case transformer: HiveTableScanExecTransformer =>
+        fields = appendStringFields(transformer.getDataSchema, fields)
+      case transformer: BatchScanExecTransformer =>
+        fields = appendStringFields(transformer.getDataSchema, fields)
+      case _ =>
+    }
+
     val validationResult = BackendsApiManager.getSettings
-      .supportFileFormatRead(
-        fileFormat,
-        schema.fields,
-        getPartitionSchema.nonEmpty,
-        getInputFilePaths)
+      .supportFileFormatRead(fileFormat, fields, getPartitionSchema.nonEmpty, 
getInputFilePaths)
     if (!validationResult.isValid) {
       return validationResult
     }
@@ -107,6 +116,17 @@ trait BasicScanExecTransformer extends 
LeafTransformSupport with BaseDataSource
     doNativeValidation(substraitContext, relNode)
   }
 
+  def appendStringFields(
+      schema: StructType,
+      existingFields: Array[StructField]): Array[StructField] = {
+    val stringFields = 
schema.fields.filter(_.dataType.isInstanceOf[StringType])
+    if (stringFields.nonEmpty) {
+      (existingFields ++ stringFields).distinct
+    } else {
+      existingFields
+    }
+  }
+
   override def doTransform(context: SubstraitContext): TransformContext = {
     val output = outputAttributes()
     val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 00dd65b1c..4341d09b8 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.hive.execution
 
-import io.glutenproject.execution.TransformSupport
+import io.glutenproject.execution.{FileSourceScanExecTransformer, 
TransformSupport}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
@@ -25,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, 
Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils}
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 
@@ -119,6 +120,69 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
       purge = false)
   }
 
+  testGluten("Add orc char type validation") {
+    withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") {
+      sql("DROP TABLE IF EXISTS test_orc")
+      sql(
+        "CREATE TABLE test_orc (name char(10), id int)" +
+          " USING hive OPTIONS(fileFormat 'orc')")
+      sql("INSERT INTO test_orc VALUES('test', 1)")
+    }
+
+    def testExecPlan(
+        convertMetastoreOrc: String,
+        charTypeFallbackEnabled: String,
+        shouldFindTransformer: Boolean,
+        transformerClass: Class[_ <: SparkPlan]
+    ): Unit = {
+
+      withSQLConf(
+        "spark.sql.hive.convertMetastoreOrc" -> convertMetastoreOrc,
+        "spark.gluten.sql.orc.charType.scan.fallback.enabled" -> 
charTypeFallbackEnabled
+      ) {
+        val queries = Seq("select id from test_orc", "select name, id from 
test_orc")
+
+        queries.foreach {
+          query =>
+            val executedPlan = getExecutedPlan(spark.sql(query))
+            val planCondition = 
executedPlan.exists(_.find(transformerClass.isInstance).isDefined)
+
+            if (shouldFindTransformer) {
+              assert(planCondition)
+            } else {
+              assert(!planCondition)
+            }
+        }
+      }
+    }
+
+    testExecPlan(
+      "false",
+      "true",
+      shouldFindTransformer = false,
+      classOf[HiveTableScanExecTransformer])
+    testExecPlan(
+      "false",
+      "false",
+      shouldFindTransformer = true,
+      classOf[HiveTableScanExecTransformer])
+
+    testExecPlan(
+      "true",
+      "true",
+      shouldFindTransformer = false,
+      classOf[FileSourceScanExecTransformer])
+    testExecPlan(
+      "true",
+      "false",
+      shouldFindTransformer = true,
+      classOf[FileSourceScanExecTransformer])
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_orc"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
+
   testGluten("avoid unnecessary filter binding for subfield during scan") {
     withSQLConf(
       "spark.sql.hive.convertMetastoreParquet" -> "false",
diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala 
b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
index 8a8bd4152..48e37cdb3 100644
--- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
+++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
@@ -103,6 +103,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
   def forceComplexTypeScanFallbackEnabled: Boolean =
     conf.getConf(VELOX_FORCE_COMPLEX_TYPE_SCAN_FALLBACK)
 
+  def forceOrcCharTypeScanFallbackEnabled: Boolean =
+    conf.getConf(VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK)
+
   // whether to use ColumnarShuffleManager
   def isUseColumnarShuffleManager: Boolean =
     conf
@@ -1681,4 +1684,11 @@ object GlutenConfig {
       .doc("Force fallback for complex type scan, including struct, map, 
array.")
       .booleanConf
       .createWithDefault(true)
+
+  val VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK =
+    buildConf("spark.gluten.sql.orc.charType.scan.fallback.enabled")
+      .internal()
+      .doc("Force fallback for orc char type scan.")
+      .booleanConf
+      .createWithDefault(true)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to