This is an automated email from the ASF dual-hosted git repository.
rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7dde1014dc [GLUTEN-11622][VL] Fallback TimestampNTZ to Spark (#11609)
7dde1014dc is described below
commit 7dde1014dc63fb596acfebf2d630b871bd0470c9
Author: Ankita Victor <[email protected]>
AuthorDate: Thu Feb 19 16:11:33 2026 +0530
[GLUTEN-11622][VL] Fallback TimestampNTZ to Spark (#11609)
---
.../VeloxParquetDataTypeValidationSuite.scala | 14 +++++
.../apache/spark/sql/utils/SparkArrowUtil.scala | 17 +++++-
.../org/apache/gluten/execution/DeltaSuite.scala | 62 ++++++++++++++++++++++
.../extension/columnar/validator/Validators.scala | 26 +++++++++
4 files changed, 117 insertions(+), 2 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
index cf63cd4453..a5e814dc0f 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
@@ -465,6 +465,20 @@ class VeloxParquetDataTypeValidationSuite extends
VeloxWholeStageTransformerSuit
}
}
+ testWithMinSparkVersion("Fallback for TimestampNTZ type scan", "3.4") {
+ withTempDir {
+ dir =>
+ val path = new File(dir, "ntz_data").toURI.getPath
+ val inputDf =
+ spark.sql("SELECT CAST('2024-01-01 00:00:00' AS TIMESTAMP_NTZ) AS
ts_ntz")
+ inputDf.write.format("parquet").save(path)
+ val df = spark.read.format("parquet").load(path)
+ val executedPlan = getExecutedPlan(df)
+ assert(!executedPlan.exists(plan =>
plan.isInstanceOf[BatchScanExecTransformer]))
+ checkAnswer(df, inputDf)
+ }
+ }
+
test("Velox Parquet Write") {
withSQLConf((GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")) {
withTempDir {
diff --git
a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
index da3f5c0708..0e2ce5e309 100644
---
a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
+++
b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
@@ -50,6 +50,8 @@ object SparkArrowUtil {
} else {
new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")
}
+ case dt if dt.catalogString == "timestamp_ntz" =>
+ new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
case YearMonthIntervalType.DEFAULT =>
new ArrowType.Interval(IntervalUnit.YEAR_MONTH)
case _: ArrayType => ArrowType.List.INSTANCE
@@ -72,7 +74,17 @@ object SparkArrowUtil {
case ArrowType.Binary.INSTANCE => BinaryType
case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType
- // TODO: Time unit is not handled.
+ case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND &&
ts.getTimezone == null =>
+ // TimestampNTZType is only available in Spark 3.4+
+ try {
+ Class
+ .forName("org.apache.spark.sql.types.TimestampNTZType$")
+ .getField("MODULE$")
+ .get(null)
+ .asInstanceOf[DataType]
+ } catch {
+ case _: ClassNotFoundException => TimestampType
+ }
case _: ArrowType.Timestamp => TimestampType
case interval: ArrowType.Interval if interval.getUnit ==
IntervalUnit.YEAR_MONTH =>
YearMonthIntervalType.DEFAULT
@@ -156,7 +168,8 @@ object SparkArrowUtil {
}.asJava)
}
- // TimestampNTZ does not support
+ // TimestampNTZ is not supported for native computation, but the Arrow type
mapping is needed
+ // for row-to-columnar transitions when the fallback validator tags NTZ
operators.
def checkSchema(schema: StructType): Boolean = {
try {
SparkSchemaUtil.toArrowSchema(schema)
diff --git
a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
index 57e9028361..4dc487f1ba 100644
--- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
+++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
@@ -340,4 +340,66 @@ abstract class DeltaSuite extends
WholeStageTransformerSuite {
}
}
}
+
+ // TIMESTAMP_NTZ was introduced in Spark 3.4 / Delta 2.4
+ testWithMinSparkVersion(
+ "delta: create table with TIMESTAMP_NTZ should fallback and return correct
results",
+ "3.4") {
+ withTable("delta_ntz") {
+ spark.sql("CREATE TABLE delta_ntz(c1 STRING, c2 TIMESTAMP, c3
TIMESTAMP_NTZ) USING DELTA")
+ spark.sql("""INSERT INTO delta_ntz VALUES
+ |('foo','2022-01-02 03:04:05.123456','2022-01-02
03:04:05.123456')""".stripMargin)
+ val df = runQueryAndCompare("select * from delta_ntz", noFallBack =
false) { _ => }
+ checkAnswer(
+ df,
+ Row(
+ "foo",
+ java.sql.Timestamp.valueOf("2022-01-02 03:04:05.123456"),
+ java.time.LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)))
+ }
+ }
+
+ testWithMinSparkVersion(
+ "delta: TIMESTAMP_NTZ as partition column should fallback and return
correct results",
+ "3.4") {
+ withTable("delta_ntz_part") {
+ spark.sql("""CREATE TABLE delta_ntz_part(c1 STRING, c2 TIMESTAMP, c3
TIMESTAMP_NTZ)
+ |USING DELTA PARTITIONED BY (c3)""".stripMargin)
+ spark.sql("""INSERT INTO delta_ntz_part VALUES
+ |('foo','2022-01-02 03:04:05.123456','2022-01-02
03:04:05.123456'),
+ |('bar','2023-06-15 10:30:00.000000','2023-06-15
10:30:00.000000')""".stripMargin)
+ val df = runQueryAndCompare("select * from delta_ntz_part order by c1",
noFallBack = false) {
+ _ =>
+ }
+ checkAnswer(
+ df,
+ Seq(
+ Row(
+ "bar",
+ java.sql.Timestamp.valueOf("2023-06-15 10:30:00"),
+ java.time.LocalDateTime.of(2023, 6, 15, 10, 30, 0, 0)),
+ Row(
+ "foo",
+ java.sql.Timestamp.valueOf("2022-01-02 03:04:05.123456"),
+ java.time.LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))
+ )
+ )
+ }
+ }
+
+ testWithMinSparkVersion(
+ "delta: filter on TIMESTAMP_NTZ column should fallback and return correct
results",
+ "3.4") {
+ withTable("delta_ntz_filter") {
+ spark.sql("CREATE TABLE delta_ntz_filter(id INT, ts TIMESTAMP_NTZ) USING
DELTA")
+ spark.sql("""INSERT INTO delta_ntz_filter VALUES
+ |(1, '2022-01-01 00:00:00'),
+ |(2, '2023-01-01 00:00:00'),
+ |(3, '2024-01-01 00:00:00')""".stripMargin)
+ val df = runQueryAndCompare(
+ "select id from delta_ntz_filter where ts > '2022-06-01 00:00:00'",
+ noFallBack = false) { _ => }
+ checkAnswer(df, Seq(Row(2), Row(3)))
+ }
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index caa6ba16ba..b20663093f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -33,6 +33,7 @@ import
org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
object Validators {
implicit class ValidatorBuilderImplicits(builder: Validator.Builder) {
@@ -78,6 +79,11 @@ object Validators {
builder.add(new FallbackByTestInjects())
}
+ /** Fails validation if a plan node's input or output schema contains
TimestampNTZType. */
+ def fallbackByTimestampNTZ(): Validator.Builder = {
+ builder.add(new FallbackByTimestampNTZ())
+ }
+
/**
* Fails validation on non-scan plan nodes if Gluten is running as
scan-only mode. Also, passes
* validation on filter for the exception that filter + scan is detected.
Because filters can be
@@ -212,6 +218,25 @@ object Validators {
}
}
+ private class FallbackByTimestampNTZ() extends Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = {
+ def containsNTZ(dataType: DataType): Boolean = dataType match {
+ case dt if dt.catalogString == "timestamp_ntz" => true
+ case st: StructType => st.exists(f => containsNTZ(f.dataType))
+ case at: ArrayType => containsNTZ(at.elementType)
+ case mt: MapType => containsNTZ(mt.keyType) ||
containsNTZ(mt.valueType)
+ case _ => false
+ }
+ val hasNTZ = plan.output.exists(a => containsNTZ(a.dataType)) ||
+ plan.children.exists(_.output.exists(a => containsNTZ(a.dataType)))
+ if (hasNTZ) {
+ fail(s"${plan.nodeName} has TimestampNTZType in input/output schema")
+ } else {
+ pass()
+ }
+ }
+ }
+
private class FallbackIfScanOnlyWithFilterPushed(scanOnly: Boolean) extends
Validator {
override def validate(plan: SparkPlan): Validator.OutCome = {
if (!scanOnly) {
@@ -292,6 +317,7 @@ object Validators {
.fallbackComplexExpressions()
.fallbackByBackendSettings()
.fallbackByUserOptions()
+ .fallbackByTimestampNTZ()
.fallbackByTestInjects()
.build()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]