This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 625a476117 [VL] Enable assert_not_null expression for Velox backend 
(#11685)
625a476117 is described below

commit 625a47611770dc7551c5d129e4138e9bff83d748
Author: Kent Yao <[email protected]>
AuthorDate: Thu Mar 12 10:49:12 2026 +0800

    [VL] Enable assert_not_null expression for Velox backend (#11685)
    
    Register Spark's AssertNotNull expression mapping for the Velox backend.
    The corresponding Velox implementation was merged via
    facebookincubator/velox#16562.
---
 .../org/apache/gluten/execution/IcebergSuite.scala | 28 ++++++++++++++++++++++
 .../gluten/expression/ExpressionMappings.scala     |  2 ++
 .../gluten/utils/velox/VeloxTestSettings.scala     |  7 ++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     | 21 ++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     | 22 +++++++++++++++++
 .../apache/gluten/expression/ExpressionNames.scala |  1 +
 6 files changed, 81 insertions(+)

diff --git 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
index 4af0040fce..fdb3a68f2a 100644
--- 
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
+++ 
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
@@ -664,4 +664,32 @@ abstract class IcebergSuite extends 
WholeStageTransformerSuite {
       assert(result.head.getString(1) == "test_data")
     }
   }
+
+  test("assert_not_null with iceberg table") {
+    withTable("iceberg_not_null") {
+      spark.sql("""
+                  |CREATE TABLE iceberg_not_null (id BIGINT NOT NULL, name 
STRING NOT NULL)
+                  |USING iceberg
+                  |""".stripMargin)
+      // Insert non-null values should succeed with AssertNotNull offloaded.
+      spark.sql("INSERT INTO iceberg_not_null VALUES (1, 'a'), (2, 'b')")
+      runQueryAndCompare("SELECT * FROM iceberg_not_null") {
+        checkGlutenPlan[IcebergScanTransformer]
+      }
+
+      // Insert from a query with nullable source columns.
+      spark.sql(
+        "INSERT INTO iceberg_not_null SELECT id + 10, CAST(id AS STRING) FROM 
iceberg_not_null")
+      val df = runQueryAndCompare("SELECT * FROM iceberg_not_null ORDER BY 
id") { _ => }
+      assert(df.count() == 4)
+
+      // Insert null into NOT NULL column should throw.
+      val e = intercept[Exception] {
+        spark.sql("INSERT INTO iceberg_not_null VALUES (null, 'c')").collect()
+      }
+      assert(
+        e.getMessage.contains("null") || e.getMessage.contains("NOT_NULL") ||
+          e.getCause != null && e.getCause.getMessage.contains("null"))
+    }
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index b13aced2a6..77ad2dde09 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -23,6 +23,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
 import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
 import org.apache.spark.sql.execution.ScalarSubquery
 
@@ -294,6 +295,7 @@ object ExpressionMappings {
     Sig[WidthBucket](WIDTH_BUCKET),
     Sig[ReplicateRows](REPLICATE_ROWS),
     Sig[RaiseError](RAISE_ERROR),
+    Sig[AssertNotNull](ASSERT_NOT_NULL),
     Sig[SparkVersion](VERSION),
     // Decimal
     Sig[UnscaledValue](UNSCALED_VALUE),
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 1207121da7..e134da8306 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -878,6 +878,13 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenBitmapExpressionsQuerySuite]
   enableSuite[GlutenEmptyInSuite]
   enableSuite[GlutenRuntimeNullChecksV2Writes]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("NOT NULL checks for atomic top-level fields (byName)")
+    .exclude("NOT NULL checks for atomic top-level fields (byPosition)")
+    .exclude("NOT NULL checks for nested struct fields (byName)")
+    .exclude("NOT NULL checks for nested struct fields (byPosition)")
+    .exclude("NOT NULL checks for nullable array with required element 
(byPosition)")
+    .exclude("not null checks for fields inside nullable array (byPosition)")
   enableSuite[GlutenTableOptionsConstantFoldingSuite]
   enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
   enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 4f7c67daaa..867b16d6d6 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -56,6 +56,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenDataSourceV2FunctionSuite]
   enableSuite[GlutenDataSourceV2SQLSessionCatalogSuite]
   enableSuite[GlutenDataSourceV2SQLSuiteV1Filter]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("CreateTableAsSelect: nullable schema")
   enableSuite[GlutenDataSourceV2SQLSuiteV2Filter]
   enableSuite[GlutenDataSourceV2Suite]
     // Rewrite the following tests in GlutenDataSourceV2Suite.
@@ -785,6 +787,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenFilteredScanSuite]
   enableSuite[GlutenFiltersSuite]
   enableSuite[GlutenInsertSuite]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand")
     // the native write staing dir is differnt with vanilla Spark for coustom 
partition paths
     .exclude("SPARK-35106: Throw exception when rename custom partition paths 
returns false")
     .exclude("Stop task set if FileAlreadyExistsException was thrown")
@@ -1103,21 +1107,38 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenBitmapExpressionsQuerySuite]
   enableSuite[GlutenEmptyInSuite]
   enableSuite[GlutenRuntimeNullChecksV2Writes]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("NOT NULL checks for atomic top-level fields (byName)")
+    .exclude("NOT NULL checks for atomic top-level fields (byPosition)")
+    .exclude("NOT NULL checks for nested struct fields (byName)")
+    .exclude("NOT NULL checks for nested struct fields (byPosition)")
+    .exclude("NOT NULL checks for nullable array with required element 
(byPosition)")
+    .exclude("not null checks for fields inside nullable array (byPosition)")
   enableSuite[GlutenTableOptionsConstantFoldingSuite]
   enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
     // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("merge with NOT NULL checks")
   enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
     // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("merge with NOT NULL checks")
   enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite]
     // FIXME: complex type result mismatch
     .exclude("update nested struct fields")
     .exclude("update char/varchar columns")
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("update with NOT NULL checks")
   enableSuite[GlutenDeltaBasedUpdateTableSuite]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("update with NOT NULL checks")
   enableSuite[GlutenGroupBasedMergeIntoTableSuite]
     // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("merge with NOT NULL checks")
   enableSuite[GlutenFileSourceCustomMetadataStructSuite]
   enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
   enableSuite[GlutenTableLocationSuite]
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 2d693bcd9e..da37ea5f7f 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -56,6 +56,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenDataSourceV2FunctionSuite]
   enableSuite[GlutenDataSourceV2SQLSessionCatalogSuite]
   enableSuite[GlutenDataSourceV2SQLSuiteV1Filter]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("CreateTableAsSelect: nullable schema")
   enableSuite[GlutenDataSourceV2SQLSuiteV2Filter]
   enableSuite[GlutenDataSourceV2Suite]
     // Rewrite the following tests in GlutenDataSourceV2Suite.
@@ -751,6 +753,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenFilteredScanSuite]
   enableSuite[GlutenFiltersSuite]
   enableSuite[GlutenInsertSuite]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand")
     // the native write staing dir is differnt with vanilla Spark for coustom 
partition paths
     .exclude("SPARK-35106: Throw exception when rename custom partition paths 
returns false")
     .exclude("Stop task set if FileAlreadyExistsException was thrown")
@@ -1089,21 +1093,39 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenBitmapExpressionsQuerySuite]
   enableSuite[GlutenEmptyInSuite]
   enableSuite[GlutenRuntimeNullChecksV2Writes]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("NOT NULL checks for atomic top-level fields (byName)")
+    .exclude("NOT NULL checks for atomic top-level fields (byPosition)")
+    .exclude("NOT NULL checks for nested struct fields (byName)")
+    .exclude("NOT NULL checks for nested struct fields (byPosition)")
+    .exclude("NOT NULL checks for nested structs, arrays, maps (byName)")
+    .exclude("NOT NULL checks for nullable array with required element 
(byPosition)")
+    .exclude("not null checks for fields inside nullable array (byPosition)")
   enableSuite[GlutenTableOptionsConstantFoldingSuite]
   enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
     // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("merge with NOT NULL checks")
   enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
     // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("merge with NOT NULL checks")
   enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite]
     // FIXME: complex type result mismatch
     .exclude("update nested struct fields")
     .exclude("update char/varchar columns")
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("update with NOT NULL checks")
   enableSuite[GlutenDeltaBasedUpdateTableSuite]
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("update with NOT NULL checks")
   enableSuite[GlutenGroupBasedMergeIntoTableSuite]
     // Replaced by Gluten versions that handle wrapped exceptions
     .excludeByPrefix("merge cardinality check with")
+    // Velox assert_not_null throws VeloxUserError instead of 
SparkRuntimeException
+    .exclude("merge with NOT NULL checks")
   enableSuite[GlutenFileSourceCustomMetadataStructSuite]
   enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
   enableSuite[GlutenTableLocationSuite]
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
 
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index f2ae5647b1..b168cf4fa2 100644
--- 
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++ 
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -348,6 +348,7 @@ object ExpressionNames {
   final val VERSION = "version"
   final val AT_LEAST_N_NON_NULLS = "at_least_n_non_nulls"
   final val ASSERT_TRUE = "assert_true"
+  final val ASSERT_NOT_NULL = "assert_not_null"
   final val NULLIF = "nullif"
   final val NVL = "nvl"
   final val NVL2 = "nvl2"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to