This is an automated email from the ASF dual-hosted git repository.

rui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7dde1014dc [GLUTEN-11622][VL] Fallback TimestampNTZ to Spark (#11609)
7dde1014dc is described below

commit 7dde1014dc63fb596acfebf2d630b871bd0470c9
Author: Ankita Victor <[email protected]>
AuthorDate: Thu Feb 19 16:11:33 2026 +0530

    [GLUTEN-11622][VL] Fallback TimestampNTZ to Spark (#11609)
---
 .../VeloxParquetDataTypeValidationSuite.scala      | 14 +++++
 .../apache/spark/sql/utils/SparkArrowUtil.scala    | 17 +++++-
 .../org/apache/gluten/execution/DeltaSuite.scala   | 62 ++++++++++++++++++++++
 .../extension/columnar/validator/Validators.scala  | 26 +++++++++
 4 files changed, 117 insertions(+), 2 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
index cf63cd4453..a5e814dc0f 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxParquetDataTypeValidationSuite.scala
@@ -465,6 +465,20 @@ class VeloxParquetDataTypeValidationSuite extends 
VeloxWholeStageTransformerSuit
     }
   }
 
+  testWithMinSparkVersion("Fallback for TimestampNTZ type scan", "3.4") {
+    withTempDir {
+      dir =>
+        val path = new File(dir, "ntz_data").toURI.getPath
+        val inputDf =
+          spark.sql("SELECT CAST('2024-01-01 00:00:00' AS TIMESTAMP_NTZ) AS 
ts_ntz")
+        inputDf.write.format("parquet").save(path)
+        val df = spark.read.format("parquet").load(path)
+        val executedPlan = getExecutedPlan(df)
+        assert(!executedPlan.exists(plan => 
plan.isInstanceOf[BatchScanExecTransformer]))
+        checkAnswer(df, inputDf)
+    }
+  }
+
   test("Velox Parquet Write") {
     withSQLConf((GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")) {
       withTempDir {
diff --git 
a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala 
b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
index da3f5c0708..0e2ce5e309 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/spark/sql/utils/SparkArrowUtil.scala
@@ -50,6 +50,8 @@ object SparkArrowUtil {
       } else {
         new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")
       }
+    case dt if dt.catalogString == "timestamp_ntz" =>
+      new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
     case YearMonthIntervalType.DEFAULT =>
       new ArrowType.Interval(IntervalUnit.YEAR_MONTH)
     case _: ArrayType => ArrowType.List.INSTANCE
@@ -72,7 +74,17 @@ object SparkArrowUtil {
     case ArrowType.Binary.INSTANCE => BinaryType
     case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale)
     case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType
-    // TODO: Time unit is not handled.
+    case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND && 
ts.getTimezone == null =>
+      // TimestampNTZType is only available in Spark 3.4+
+      try {
+        Class
+          .forName("org.apache.spark.sql.types.TimestampNTZType$")
+          .getField("MODULE$")
+          .get(null)
+          .asInstanceOf[DataType]
+      } catch {
+        case _: ClassNotFoundException => TimestampType
+      }
     case _: ArrowType.Timestamp => TimestampType
     case interval: ArrowType.Interval if interval.getUnit == 
IntervalUnit.YEAR_MONTH =>
       YearMonthIntervalType.DEFAULT
@@ -156,7 +168,8 @@ object SparkArrowUtil {
     }.asJava)
   }
 
-  // TimestampNTZ does not support
+  // TimestampNTZ is not supported for native computation, but the Arrow type 
mapping is needed
+  // for row-to-columnar transitions when the fallback validator tags NTZ 
operators.
   def checkSchema(schema: StructType): Boolean = {
     try {
       SparkSchemaUtil.toArrowSchema(schema)
diff --git 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
index 57e9028361..4dc487f1ba 100644
--- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
+++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
@@ -340,4 +340,66 @@ abstract class DeltaSuite extends 
WholeStageTransformerSuite {
       }
     }
   }
+
+  // TIMESTAMP_NTZ was introduced in Spark 3.4 / Delta 2.4
+  testWithMinSparkVersion(
+    "delta: create table with TIMESTAMP_NTZ should fallback and return correct 
results",
+    "3.4") {
+    withTable("delta_ntz") {
+      spark.sql("CREATE TABLE delta_ntz(c1 STRING, c2 TIMESTAMP, c3 
TIMESTAMP_NTZ) USING DELTA")
+      spark.sql("""INSERT INTO delta_ntz VALUES
+                  |('foo','2022-01-02 03:04:05.123456','2022-01-02 
03:04:05.123456')""".stripMargin)
+      val df = runQueryAndCompare("select * from delta_ntz", noFallBack = 
false) { _ => }
+      checkAnswer(
+        df,
+        Row(
+          "foo",
+          java.sql.Timestamp.valueOf("2022-01-02 03:04:05.123456"),
+          java.time.LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)))
+    }
+  }
+
+  testWithMinSparkVersion(
+    "delta: TIMESTAMP_NTZ as partition column should fallback and return 
correct results",
+    "3.4") {
+    withTable("delta_ntz_part") {
+      spark.sql("""CREATE TABLE delta_ntz_part(c1 STRING, c2 TIMESTAMP, c3 
TIMESTAMP_NTZ)
+                  |USING DELTA PARTITIONED BY (c3)""".stripMargin)
+      spark.sql("""INSERT INTO delta_ntz_part VALUES
+                  |('foo','2022-01-02 03:04:05.123456','2022-01-02 
03:04:05.123456'),
+                  |('bar','2023-06-15 10:30:00.000000','2023-06-15 
10:30:00.000000')""".stripMargin)
+      val df = runQueryAndCompare("select * from delta_ntz_part order by c1", 
noFallBack = false) {
+        _ =>
+      }
+      checkAnswer(
+        df,
+        Seq(
+          Row(
+            "bar",
+            java.sql.Timestamp.valueOf("2023-06-15 10:30:00"),
+            java.time.LocalDateTime.of(2023, 6, 15, 10, 30, 0, 0)),
+          Row(
+            "foo",
+            java.sql.Timestamp.valueOf("2022-01-02 03:04:05.123456"),
+            java.time.LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))
+        )
+      )
+    }
+  }
+
+  testWithMinSparkVersion(
+    "delta: filter on TIMESTAMP_NTZ column should fallback and return correct 
results",
+    "3.4") {
+    withTable("delta_ntz_filter") {
+      spark.sql("CREATE TABLE delta_ntz_filter(id INT, ts TIMESTAMP_NTZ) USING 
DELTA")
+      spark.sql("""INSERT INTO delta_ntz_filter VALUES
+                  |(1, '2022-01-01 00:00:00'),
+                  |(2, '2023-01-01 00:00:00'),
+                  |(3, '2024-01-01 00:00:00')""".stripMargin)
+      val df = runQueryAndCompare(
+        "select id from delta_ntz_filter where ts > '2022-06-01 00:00:00'",
+        noFallBack = false) { _ => }
+      checkAnswer(df, Seq(Row(2), Row(3)))
+    }
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index caa6ba16ba..b20663093f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 
 object Validators {
   implicit class ValidatorBuilderImplicits(builder: Validator.Builder) {
@@ -78,6 +79,11 @@ object Validators {
       builder.add(new FallbackByTestInjects())
     }
 
+    /** Fails validation if a plan node's input or output schema contains 
TimestampNTZType. */
+    def fallbackByTimestampNTZ(): Validator.Builder = {
+      builder.add(new FallbackByTimestampNTZ())
+    }
+
     /**
      * Fails validation on non-scan plan nodes if Gluten is running as 
scan-only mode. Also, passes
      * validation on filter for the exception that filter + scan is detected. 
Because filters can be
@@ -212,6 +218,25 @@ object Validators {
     }
   }
 
+  private class FallbackByTimestampNTZ() extends Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = {
+      def containsNTZ(dataType: DataType): Boolean = dataType match {
+        case dt if dt.catalogString == "timestamp_ntz" => true
+        case st: StructType => st.exists(f => containsNTZ(f.dataType))
+        case at: ArrayType => containsNTZ(at.elementType)
+        case mt: MapType => containsNTZ(mt.keyType) || 
containsNTZ(mt.valueType)
+        case _ => false
+      }
+      val hasNTZ = plan.output.exists(a => containsNTZ(a.dataType)) ||
+        plan.children.exists(_.output.exists(a => containsNTZ(a.dataType)))
+      if (hasNTZ) {
+        fail(s"${plan.nodeName} has TimestampNTZType in input/output schema")
+      } else {
+        pass()
+      }
+    }
+  }
+
   private class FallbackIfScanOnlyWithFilterPushed(scanOnly: Boolean) extends 
Validator {
     override def validate(plan: SparkPlan): Validator.OutCome = {
       if (!scanOnly) {
@@ -292,6 +317,7 @@ object Validators {
       .fallbackComplexExpressions()
       .fallbackByBackendSettings()
       .fallbackByUserOptions()
+      .fallbackByTimestampNTZ()
       .fallbackByTestInjects()
       .build()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to