This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 625a476117 [VL] Enable assert_not_null expression for Velox backend
(#11685)
625a476117 is described below
commit 625a47611770dc7551c5d129e4138e9bff83d748
Author: Kent Yao <[email protected]>
AuthorDate: Thu Mar 12 10:49:12 2026 +0800
[VL] Enable assert_not_null expression for Velox backend (#11685)
Register Spark's AssertNotNull expression mapping for the Velox backend.
The corresponding Velox implementation was merged via
facebookincubator/velox#16562.
---
.../org/apache/gluten/execution/IcebergSuite.scala | 28 ++++++++++++++++++++++
.../gluten/expression/ExpressionMappings.scala | 2 ++
.../gluten/utils/velox/VeloxTestSettings.scala | 7 ++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 21 ++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 22 +++++++++++++++++
.../apache/gluten/expression/ExpressionNames.scala | 1 +
6 files changed, 81 insertions(+)
diff --git
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
index 4af0040fce..fdb3a68f2a 100644
---
a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
+++
b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/IcebergSuite.scala
@@ -664,4 +664,32 @@ abstract class IcebergSuite extends
WholeStageTransformerSuite {
assert(result.head.getString(1) == "test_data")
}
}
+
+ test("assert_not_null with iceberg table") {
+ withTable("iceberg_not_null") {
+ spark.sql("""
+ |CREATE TABLE iceberg_not_null (id BIGINT NOT NULL, name
STRING NOT NULL)
+ |USING iceberg
+ |""".stripMargin)
+ // Insert non-null values should succeed with AssertNotNull offloaded.
+ spark.sql("INSERT INTO iceberg_not_null VALUES (1, 'a'), (2, 'b')")
+ runQueryAndCompare("SELECT * FROM iceberg_not_null") {
+ checkGlutenPlan[IcebergScanTransformer]
+ }
+
+ // Insert from a query with nullable source columns.
+ spark.sql(
+ "INSERT INTO iceberg_not_null SELECT id + 10, CAST(id AS STRING) FROM
iceberg_not_null")
+ val df = runQueryAndCompare("SELECT * FROM iceberg_not_null ORDER BY
id") { _ => }
+ assert(df.count() == 4)
+
+ // Insert null into NOT NULL column should throw.
+ val e = intercept[Exception] {
+ spark.sql("INSERT INTO iceberg_not_null VALUES (null, 'c')").collect()
+ }
+ assert(
+ e.getMessage.contains("null") || e.getMessage.contains("NOT_NULL") ||
+ e.getCause != null && e.getCause.getMessage.contains("null"))
+ }
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index b13aced2a6..77ad2dde09 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -23,6 +23,7 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.execution.ScalarSubquery
@@ -294,6 +295,7 @@ object ExpressionMappings {
Sig[WidthBucket](WIDTH_BUCKET),
Sig[ReplicateRows](REPLICATE_ROWS),
Sig[RaiseError](RAISE_ERROR),
+ Sig[AssertNotNull](ASSERT_NOT_NULL),
Sig[SparkVersion](VERSION),
// Decimal
Sig[UnscaledValue](UNSCALED_VALUE),
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 1207121da7..e134da8306 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -878,6 +878,13 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenBitmapExpressionsQuerySuite]
enableSuite[GlutenEmptyInSuite]
enableSuite[GlutenRuntimeNullChecksV2Writes]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("NOT NULL checks for atomic top-level fields (byName)")
+ .exclude("NOT NULL checks for atomic top-level fields (byPosition)")
+ .exclude("NOT NULL checks for nested struct fields (byName)")
+ .exclude("NOT NULL checks for nested struct fields (byPosition)")
+ .exclude("NOT NULL checks for nullable array with required element
(byPosition)")
+ .exclude("not null checks for fields inside nullable array (byPosition)")
enableSuite[GlutenTableOptionsConstantFoldingSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 4f7c67daaa..867b16d6d6 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -56,6 +56,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenDataSourceV2FunctionSuite]
enableSuite[GlutenDataSourceV2SQLSessionCatalogSuite]
enableSuite[GlutenDataSourceV2SQLSuiteV1Filter]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("CreateTableAsSelect: nullable schema")
enableSuite[GlutenDataSourceV2SQLSuiteV2Filter]
enableSuite[GlutenDataSourceV2Suite]
// Rewrite the following tests in GlutenDataSourceV2Suite.
@@ -785,6 +787,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenFilteredScanSuite]
enableSuite[GlutenFiltersSuite]
enableSuite[GlutenInsertSuite]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand")
// the native write staing dir is differnt with vanilla Spark for coustom
partition paths
.exclude("SPARK-35106: Throw exception when rename custom partition paths
returns false")
.exclude("Stop task set if FileAlreadyExistsException was thrown")
@@ -1103,21 +1107,38 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenBitmapExpressionsQuerySuite]
enableSuite[GlutenEmptyInSuite]
enableSuite[GlutenRuntimeNullChecksV2Writes]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("NOT NULL checks for atomic top-level fields (byName)")
+ .exclude("NOT NULL checks for atomic top-level fields (byPosition)")
+ .exclude("NOT NULL checks for nested struct fields (byName)")
+ .exclude("NOT NULL checks for nested struct fields (byPosition)")
+ .exclude("NOT NULL checks for nullable array with required element
(byPosition)")
+ .exclude("not null checks for fields inside nullable array (byPosition)")
enableSuite[GlutenTableOptionsConstantFoldingSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("merge with NOT NULL checks")
enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("merge with NOT NULL checks")
enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite]
// FIXME: complex type result mismatch
.exclude("update nested struct fields")
.exclude("update char/varchar columns")
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("update with NOT NULL checks")
enableSuite[GlutenDeltaBasedUpdateTableSuite]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("update with NOT NULL checks")
enableSuite[GlutenGroupBasedMergeIntoTableSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("merge with NOT NULL checks")
enableSuite[GlutenFileSourceCustomMetadataStructSuite]
enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
enableSuite[GlutenTableLocationSuite]
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 2d693bcd9e..da37ea5f7f 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -56,6 +56,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenDataSourceV2FunctionSuite]
enableSuite[GlutenDataSourceV2SQLSessionCatalogSuite]
enableSuite[GlutenDataSourceV2SQLSuiteV1Filter]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("CreateTableAsSelect: nullable schema")
enableSuite[GlutenDataSourceV2SQLSuiteV2Filter]
enableSuite[GlutenDataSourceV2Suite]
// Rewrite the following tests in GlutenDataSourceV2Suite.
@@ -751,6 +753,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenFilteredScanSuite]
enableSuite[GlutenFiltersSuite]
enableSuite[GlutenInsertSuite]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand")
// the native write staing dir is differnt with vanilla Spark for coustom
partition paths
.exclude("SPARK-35106: Throw exception when rename custom partition paths
returns false")
.exclude("Stop task set if FileAlreadyExistsException was thrown")
@@ -1089,21 +1093,39 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenBitmapExpressionsQuerySuite]
enableSuite[GlutenEmptyInSuite]
enableSuite[GlutenRuntimeNullChecksV2Writes]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("NOT NULL checks for atomic top-level fields (byName)")
+ .exclude("NOT NULL checks for atomic top-level fields (byPosition)")
+ .exclude("NOT NULL checks for nested struct fields (byName)")
+ .exclude("NOT NULL checks for nested struct fields (byPosition)")
+ .exclude("NOT NULL checks for nested structs, arrays, maps (byName)")
+ .exclude("NOT NULL checks for nullable array with required element
(byPosition)")
+ .exclude("not null checks for fields inside nullable array (byPosition)")
enableSuite[GlutenTableOptionsConstantFoldingSuite]
enableSuite[GlutenDeltaBasedMergeIntoTableSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("merge with NOT NULL checks")
enableSuite[GlutenDeltaBasedMergeIntoTableUpdateAsDeleteAndInsertSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("merge with NOT NULL checks")
enableSuite[GlutenDeltaBasedUpdateAsDeleteAndInsertTableSuite]
// FIXME: complex type result mismatch
.exclude("update nested struct fields")
.exclude("update char/varchar columns")
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("update with NOT NULL checks")
enableSuite[GlutenDeltaBasedUpdateTableSuite]
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("update with NOT NULL checks")
enableSuite[GlutenGroupBasedMergeIntoTableSuite]
// Replaced by Gluten versions that handle wrapped exceptions
.excludeByPrefix("merge cardinality check with")
+ // Velox assert_not_null throws VeloxUserError instead of
SparkRuntimeException
+ .exclude("merge with NOT NULL checks")
enableSuite[GlutenFileSourceCustomMetadataStructSuite]
enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
enableSuite[GlutenTableLocationSuite]
diff --git
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index f2ae5647b1..b168cf4fa2 100644
---
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -348,6 +348,7 @@ object ExpressionNames {
final val VERSION = "version"
final val AT_LEAST_N_NON_NULLS = "at_least_n_non_nulls"
final val ASSERT_TRUE = "assert_true"
+ final val ASSERT_NOT_NULL = "assert_not_null"
final val NULLIF = "nullif"
final val NVL = "nvl"
final val NVL2 = "nvl2"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]