This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 51e1428c2 [GLUTEN-4796][VL] Force fallback for orc char type scan
(#4797)
51e1428c2 is described below
commit 51e1428c28e4b8a77befa9890e0ec40fed9553e7
Author: Kerwin Zhang <[email protected]>
AuthorDate: Mon Mar 18 14:03:05 2024 +0800
[GLUTEN-4796][VL] Force fallback for orc char type scan (#4797)
The issue resolved in #4259 dealt with the exception that occurred when a
char column was included in the select statement. This issue fixes the
exception that occurs when the char column is not included in the select
statement.
Fixes issue: #4796
---
.../backendsapi/velox/VeloxBackend.scala | 18 ++++--
.../execution/BasicScanExecTransformer.scala | 32 +++++++++--
.../hive/execution/GlutenHiveSQLQuerySuite.scala | 66 +++++++++++++++++++++-
.../main/scala/io/glutenproject/GlutenConfig.scala | 10 ++++
4 files changed, 113 insertions(+), 13 deletions(-)
diff --git
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
index 22a0f4ebc..12a76a3d2 100644
---
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
@@ -102,9 +102,7 @@ object BackendSettings extends BackendSettingsApi {
case StructField(_, structType: StructType, _, _) =>
structType.simpleString + " is forced to fallback."
case StructField(_, stringType: StringType, _, metadata)
- if CharVarcharUtils
- .getRawTypeString(metadata)
- .getOrElse(stringType.catalogString) != stringType.catalogString =>
+ if isCharType(stringType, metadata) =>
CharVarcharUtils.getRawTypeString(metadata) + " not support"
case StructField(_, TimestampType, _, _) => "TimestampType not support"
}
@@ -151,9 +149,7 @@ object BackendSettings extends BackendSettingsApi {
if mapType.valueType.isInstanceOf[ArrayType] =>
"ArrayType as Value in MapType"
case StructField(_, stringType: StringType, _, metadata)
- if CharVarcharUtils
- .getRawTypeString(metadata)
- .getOrElse(stringType.catalogString) !=
stringType.catalogString =>
+ if isCharType(stringType, metadata) =>
CharVarcharUtils.getRawTypeString(metadata) + " not support"
case StructField(_, TimestampType, _, _) => "TimestampType not
support"
}
@@ -167,6 +163,16 @@ object BackendSettings extends BackendSettingsApi {
}
}
+ def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
+ val charTypePattern = "char\\((\\d+)\\)".r
+ GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled && charTypePattern
+ .findFirstIn(
+ CharVarcharUtils
+ .getRawTypeString(metadata)
+ .getOrElse(stringType.catalogString))
+ .isDefined
+ }
+
override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
diff --git
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
index 2be1a162c..997395c0c 100644
---
a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala
@@ -28,7 +28,8 @@ import
io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.BooleanType
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
+import org.apache.spark.sql.types.{BooleanType, StringType, StructField,
StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import com.google.common.collect.Lists
@@ -91,12 +92,20 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
}
override protected def doValidateInternal(): ValidationResult = {
+ var fields = schema.fields
+
+ this match {
+ case transformer: FileSourceScanExecTransformer =>
+ fields = appendStringFields(transformer.relation.schema, fields)
+ case transformer: HiveTableScanExecTransformer =>
+ fields = appendStringFields(transformer.getDataSchema, fields)
+ case transformer: BatchScanExecTransformer =>
+ fields = appendStringFields(transformer.getDataSchema, fields)
+ case _ =>
+ }
+
val validationResult = BackendsApiManager.getSettings
- .supportFileFormatRead(
- fileFormat,
- schema.fields,
- getPartitionSchema.nonEmpty,
- getInputFilePaths)
+ .supportFileFormatRead(fileFormat, fields, getPartitionSchema.nonEmpty,
getInputFilePaths)
if (!validationResult.isValid) {
return validationResult
}
@@ -107,6 +116,17 @@ trait BasicScanExecTransformer extends
LeafTransformSupport with BaseDataSource
doNativeValidation(substraitContext, relNode)
}
+ def appendStringFields(
+ schema: StructType,
+ existingFields: Array[StructField]): Array[StructField] = {
+ val stringFields =
schema.fields.filter(_.dataType.isInstanceOf[StringType])
+ if (stringFields.nonEmpty) {
+ (existingFields ++ stringFields).distinct
+ } else {
+ existingFields
+ }
+ }
+
override def doTransform(context: SubstraitContext): TransformContext = {
val output = outputAttributes()
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 00dd65b1c..4341d09b8 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.hive.execution
-import io.glutenproject.execution.TransformSupport
+import io.glutenproject.execution.{FileSourceScanExecTransformer,
TransformSupport}
import org.apache.spark.SparkConf
import org.apache.spark.internal.config
@@ -25,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait,
Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.{HiveTableScanExecTransformer, HiveUtils}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -119,6 +120,69 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
purge = false)
}
+ testGluten("Add orc char type validation") {
+ withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "false") {
+ sql("DROP TABLE IF EXISTS test_orc")
+ sql(
+ "CREATE TABLE test_orc (name char(10), id int)" +
+ " USING hive OPTIONS(fileFormat 'orc')")
+ sql("INSERT INTO test_orc VALUES('test', 1)")
+ }
+
+ def testExecPlan(
+ convertMetastoreOrc: String,
+ charTypeFallbackEnabled: String,
+ shouldFindTransformer: Boolean,
+ transformerClass: Class[_ <: SparkPlan]
+ ): Unit = {
+
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreOrc" -> convertMetastoreOrc,
+ "spark.gluten.sql.orc.charType.scan.fallback.enabled" ->
charTypeFallbackEnabled
+ ) {
+ val queries = Seq("select id from test_orc", "select name, id from
test_orc")
+
+ queries.foreach {
+ query =>
+ val executedPlan = getExecutedPlan(spark.sql(query))
+ val planCondition =
executedPlan.exists(_.find(transformerClass.isInstance).isDefined)
+
+ if (shouldFindTransformer) {
+ assert(planCondition)
+ } else {
+ assert(!planCondition)
+ }
+ }
+ }
+ }
+
+ testExecPlan(
+ "false",
+ "true",
+ shouldFindTransformer = false,
+ classOf[HiveTableScanExecTransformer])
+ testExecPlan(
+ "false",
+ "false",
+ shouldFindTransformer = true,
+ classOf[HiveTableScanExecTransformer])
+
+ testExecPlan(
+ "true",
+ "true",
+ shouldFindTransformer = false,
+ classOf[FileSourceScanExecTransformer])
+ testExecPlan(
+ "true",
+ "false",
+ shouldFindTransformer = true,
+ classOf[FileSourceScanExecTransformer])
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_orc"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
+
testGluten("avoid unnecessary filter binding for subfield during scan") {
withSQLConf(
"spark.sql.hive.convertMetastoreParquet" -> "false",
diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
index 8a8bd4152..48e37cdb3 100644
--- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
+++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala
@@ -103,6 +103,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def forceComplexTypeScanFallbackEnabled: Boolean =
conf.getConf(VELOX_FORCE_COMPLEX_TYPE_SCAN_FALLBACK)
+ def forceOrcCharTypeScanFallbackEnabled: Boolean =
+ conf.getConf(VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK)
+
// whether to use ColumnarShuffleManager
def isUseColumnarShuffleManager: Boolean =
conf
@@ -1681,4 +1684,11 @@ object GlutenConfig {
.doc("Force fallback for complex type scan, including struct, map,
array.")
.booleanConf
.createWithDefault(true)
+
+ val VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK =
+ buildConf("spark.gluten.sql.orc.charType.scan.fallback.enabled")
+ .internal()
+ .doc("Force fallback for orc char type scan.")
+ .booleanConf
+ .createWithDefault(true)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]