This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6a7f6e3978 [VL] Fix raise_error support for constant-folded MapData in
Spark 4.x (#11558)
6a7f6e3978 is described below
commit 6a7f6e397898cc5805f689550bc6f7e537f691a5
Author: Ankita Victor <[email protected]>
AuthorDate: Wed Feb 4 18:11:15 2026 +0530
[VL] Fix raise_error support for constant-folded MapData in Spark 4.x
(#11558)
What changes are proposed in this pull request?
When Spark constant-folds the CreateMap expression in raise_error calls,
the expression becomes a Literal containing MapData instead of a CreateMap with
literal children. The previous implementation couldn't extract the error
message from this optimized form, causing fallback. This fix ensures proper
handling.
Also updated test SPARK-52684 which previously would fallback but is now
executed in Gluten and hence throws a SparkException (wrapping GlutenException
instead of SparkRuntimeException
org.apache.gluten.exception.GlutenException:
org.apache.gluten.exception.GlutenException: Exception: VeloxUserError
Error Source: USER
Error Code: INVALID_ARGUMENT
Reason: SPARK-52684
Retriable: False
DisplayReason: False
Context: Top-level Expression: raise_error(SPARK-52684:VARCHAR)
Function: setStatus
File: /root/velox/velox/expression/EvalCtx.cpp
Line: 184
Stack trace:
# 0 _ZN8facebook5velox7process10StackTraceC1Ei
# 1
_ZN8facebook5velox14VeloxExceptionC2EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bbNS1_4TypeES7_
# 2
_ZN8facebook5velox6detail14veloxCheckFailINS0_14VeloxUserErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_
# 3 _ZN8facebook5velox4exec7EvalCtx9setStatusEiRKNS0_6StatusE
# 4
_ZNK8facebook5velox17SelectivityVector15applyToSelectedIZNS0_4exec7EvalCtx22applyToSelectedNoThrowIZNKS3_21SimpleFunctionAdapterINS0_4core9UDFHolderINS0_9functions8sparksql18RaiseErrorFunctionINS3_10VectorExecEEESC_NS0_12UnknownValueENS0_15ConstantCheckerIJNS0_7VarcharEEEEJSG_EEEE7iterateIJNS3_20ConstantVectorReaderISG_EEEEEvRNSJ_12ApplyContextEDpRT_EUlT_E3_ZNS4_22applyToSelectedNoThrowIST_EEvRKS1_SS_EUlSS_E_EEvSW_SS_T0_EUlSS_E_EEvSS_
# 5
_ZNK8facebook5velox4exec21SimpleFunctionAdapterINS0_4core9UDFHolderINS0_9functions8sparksql18RaiseErrorFunctionINS1_10VectorExecEEES8_NS0_12UnknownValueENS0_15ConstantCheckerIJNS0_7VarcharEEEEJSC_EEEE5applyERKNS0_17SelectivityVectorERSt6vectorISt10shared_ptrINS0_10BaseVectorEESaISM_EERKSK_IKNS0_4TypeEERNS1_7EvalCtxERSM_
# 6
_ZN8facebook5velox4exec4Expr13applyFunctionERKNS0_17SelectivityVectorERNS1_7EvalCtxERSt10shared_ptrINS0_10BaseVectorEE
# 7
_ZN8facebook5velox4exec4Expr19evalFlatNoNullsImplERKNS0_17SelectivityVectorERNS1_7EvalCtxERSt10shared_ptrINS0_10BaseVectorEEPKNS1_7ExprSetE
# 8
_ZN8facebook5velox4exec7ExprSet4evalEiibRKNS0_17SelectivityVectorERNS1_7EvalCtxERSt6vectorISt10shared_ptrINS0_10BaseVectorEESaISB_EE
# 9
_ZN8facebook5velox4exec13FilterProject7projectERKNS0_17SelectivityVectorERNS1_7EvalCtxE
# 10 _ZN8facebook5velox4exec13FilterProject9getOutputEv
# 11
_ZZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEEENKUlvE7_clEv
# 12
_ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE
# 13
_ZN8facebook5velox4exec6Driver4nextEPN5folly10SemiFutureINS3_4UnitEEERPNS1_8OperatorERNS1_14BlockingReasonE
# 14 _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE
# 15 _ZN6gluten24WholeStageResultIterator4nextEv
# 16
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext
# 17 0x00007bf6fc8459c7
How was this patch tested?
UT in CI
Was this patch authored or co-authored using generative AI tooling?
No
---
.../org/apache/gluten/utils/velox/VeloxTestSettings.scala | 2 ++
.../scala/org/apache/spark/sql/GlutenCachedTableSuite.scala | 11 ++++++++++-
.../org/apache/gluten/utils/velox/VeloxTestSettings.scala | 2 ++
.../scala/org/apache/spark/sql/GlutenCachedTableSuite.scala | 11 ++++++++++-
.../org/apache/gluten/sql/shims/spark40/Spark40Shims.scala | 11 ++++++++---
.../org/apache/gluten/sql/shims/spark41/Spark41Shims.scala | 11 ++++++++---
6 files changed, 40 insertions(+), 8 deletions(-)
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 881f0dabb2..ad7796621a 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -865,6 +865,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("InMemoryRelation statistics")
// Extra ColumnarToRow is needed to transform vanilla columnar data to
gluten columnar data.
.exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on
InMemoryTableScan")
+ // Rewritten because native raise_error throws Spark exception
+ .exclude("SPARK-52684: Atomicity of cache table on error")
enableSuite[GlutenFileSourceCharVarcharTestSuite]
enableSuite[GlutenDSV2CharVarcharTestSuite]
enableSuite[GlutenColumnExpressionSuite]
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
index 0afabae6e5..f438812659 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql
import org.apache.gluten.config.GlutenConfig
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
@@ -36,6 +36,15 @@ class GlutenCachedTableSuite
super.sparkConf.set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
}
+ testGluten("SPARK-52684: Atomicity of cache table on error") {
+ withTempView("SPARK_52684") {
+ intercept[SparkException] {
+ spark.sql("CACHE TABLE SPARK_52684 AS SELECT
raise_error('SPARK-52684') AS c1")
+ }
+ assert(!spark.catalog.tableExists("SPARK_52684"))
+ }
+ }
+
testGluten("InMemoryRelation statistics") {
sql("CACHE TABLE testData")
spark.table("testData").queryExecution.withCachedData.collect {
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 10741f7fbc..dd10657ed7 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -833,6 +833,8 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("InMemoryRelation statistics")
// Extra ColumnarToRow is needed to transform vanilla columnar data to
gluten columnar data.
.exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on
InMemoryTableScan")
+ // Rewritten because native raise_error throws Spark exception
+ .exclude("SPARK-52684: Atomicity of cache table on error")
enableSuite[GlutenCacheTableInKryoSuite]
enableSuite[GlutenFileSourceCharVarcharTestSuite]
enableSuite[GlutenDSV2CharVarcharTestSuite]
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
index 0afabae6e5..f438812659 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql
import org.apache.gluten.config.GlutenConfig
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
@@ -36,6 +36,15 @@ class GlutenCachedTableSuite
super.sparkConf.set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
}
+ testGluten("SPARK-52684: Atomicity of cache table on error") {
+ withTempView("SPARK_52684") {
+ intercept[SparkException] {
+ spark.sql("CACHE TABLE SPARK_52684 AS SELECT
raise_error('SPARK-52684') AS c1")
+ }
+ assert(!spark.catalog.tableExists("SPARK_52684"))
+ }
+ }
+
testGluten("InMemoryRelation statistics") {
sql("CACHE TABLE testData")
spark.table("testData").queryExecution.withCachedData.collect {
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 72c9b27293..f329a18cfc 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution, KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
InternalRowComparableWrapper, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
InternalRowComparableWrapper, MapData, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.connector.catalog.Table
@@ -732,8 +732,13 @@ class Spark40Shims extends SparkShims {
if children.size == 2 && children.head.isInstanceOf[Literal]
&& children.head.asInstanceOf[Literal].value.toString ==
"errorMessage" =>
Some(children(1))
- case _ =>
- None
+ case lit: Literal if lit.value.isInstanceOf[MapData] =>
+ // Constant-folded CreateMap: look up "errorMessage" in the MapData
+ val mapData = lit.value.asInstanceOf[MapData]
+ (0 until mapData.numElements())
+ .find(i => mapData.keyArray().getUTF8String(i).toString ==
"errorMessage")
+ .map(i => Literal(mapData.valueArray().getUTF8String(i), StringType))
+ case _ => None
}
}
diff --git
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
index 8aad6394f0..f7ae54cc29 100644
---
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
+++
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
Distribution, KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
InternalRowComparableWrapper, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap,
InternalRowComparableWrapper, MapData, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
@@ -731,8 +731,13 @@ class Spark41Shims extends SparkShims {
if children.size == 2 && children.head.isInstanceOf[Literal]
&& children.head.asInstanceOf[Literal].value.toString ==
"errorMessage" =>
Some(children(1))
- case _ =>
- None
+ case lit: Literal if lit.value.isInstanceOf[MapData] =>
+ // Constant-folded CreateMap: look up "errorMessage" in the MapData
+ val mapData = lit.value.asInstanceOf[MapData]
+ (0 until mapData.numElements())
+ .find(i => mapData.keyArray().getUTF8String(i).toString ==
"errorMessage")
+ .map(i => Literal(mapData.valueArray().getUTF8String(i), StringType))
+ case _ => None
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]