This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7ad2ea715 [GLUTEN-5547][VL] Add config to force fallback scan for
timestamp type (#5546)
7ad2ea715 is described below
commit 7ad2ea71527e61a565106457e9d2eebe450e7882
Author: Mingliang Zhu <[email protected]>
AuthorDate: Sat Apr 27 15:51:32 2024 +0800
[GLUTEN-5547][VL] Add config to force fallback scan for timestamp type
(#5546)
---
.../org/apache/gluten/backendsapi/velox/VeloxBackend.scala | 6 ++++++
.../gluten/execution/VeloxParquetDataTypeValidationSuite.scala | 8 ++++++++
.../common/src/main/scala/org/apache/gluten/GlutenConfig.scala | 10 ++++++++++
3 files changed, 24 insertions(+)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 8b7cec383..0e23ef8ba 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -92,6 +92,9 @@ object VeloxBackendSettings extends BackendSettingsApi {
mapType.simpleString + " is forced to fallback."
case StructField(_, structType: StructType, _, _) =>
structType.simpleString + " is forced to fallback."
+ case StructField(_, timestampType: TimestampType, _, _)
+ if GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled
=>
+ timestampType.simpleString + " is forced to fallback."
}
val orcTypeValidatorWithComplexTypeFallback: PartialFunction[StructField,
String] = {
case StructField(_, arrayType: ArrayType, _, _) =>
@@ -122,6 +125,9 @@ object VeloxBackendSettings extends BackendSettingsApi {
case StructField(_, mapType: MapType, _, _)
if mapType.valueType.isInstanceOf[ArrayType] =>
"ArrayType as Value in MapType"
+ case StructField(_, TimestampType, _, _)
+ if
GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled =>
+ "TimestampType"
}
if (!GlutenConfig.getConf.forceComplexTypeScanFallbackEnabled) {
validateTypes(typeValidator)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
index bd42b6024..9793df2ab 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
@@ -435,6 +435,14 @@ class VeloxParquetDataTypeValidationSuite extends
VeloxWholeStageTransformerSuit
}
}
+ test("Force timestamp type scan fallback") {
+
withSQLConf(("spark.gluten.sql.parquet.timestampType.scan.fallback.enabled",
"true")) {
+ val df = spark.sql("select timestamp from type1")
+ val executedPlan = getExecutedPlan(df)
+ assert(!executedPlan.exists(plan =>
plan.isInstanceOf[BatchScanExecTransformer]))
+ }
+ }
+
test("Decimal type") {
// Validation: BatchScan Project Aggregate Expand Sort Limit
runQueryAndCompare(
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index d19d8875f..3c7ddf32c 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -114,6 +114,9 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def forceOrcCharTypeScanFallbackEnabled: Boolean =
conf.getConf(VELOX_FORCE_ORC_CHAR_TYPE_SCAN_FALLBACK)
+ def forceParquetTimestampTypeScanFallbackEnabled: Boolean =
+ conf.getConf(VELOX_FORCE_PARQUET_TIMESTAMP_TYPE_SCAN_FALLBACK)
+
// whether to use ColumnarShuffleManager
def isUseColumnarShuffleManager: Boolean =
conf
@@ -1806,6 +1809,13 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
+ val VELOX_FORCE_PARQUET_TIMESTAMP_TYPE_SCAN_FALLBACK =
+ buildConf("spark.gluten.sql.parquet.timestampType.scan.fallback.enabled")
+ .internal()
+ .doc("Force fallback for parquet timestamp type scan.")
+ .booleanConf
+ .createWithDefault(false)
+
val COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED =
buildConf("spark.gluten.sql.columnar.cast.avg")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]