This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a7f6e3978 [VL] Fix raise_error support for constant-folded MapData in 
Spark 4.x (#11558)
6a7f6e3978 is described below

commit 6a7f6e397898cc5805f689550bc6f7e537f691a5
Author: Ankita Victor <[email protected]>
AuthorDate: Wed Feb 4 18:11:15 2026 +0530

    [VL] Fix raise_error support for constant-folded MapData in Spark 4.x 
(#11558)
    
    What changes are proposed in this pull request?
    When Spark constant-folds the CreateMap expression in raise_error calls, 
the expression becomes a Literal containing MapData instead of a CreateMap with 
literal children. The previous implementation couldn't extract the error 
message from this optimized form, causing fallback. This fix ensures proper 
handling.
    
    Also updated test SPARK-52684 which previously would fallback but is now 
executed in Gluten and hence throws a SparkException (wrapping GlutenException 
instead of SparkRuntimeException
    
    org.apache.gluten.exception.GlutenException: 
org.apache.gluten.exception.GlutenException: Exception: VeloxUserError
    Error Source: USER
    Error Code: INVALID_ARGUMENT
    Reason: SPARK-52684
    Retriable: False
    DisplayReason: False
    Context: Top-level Expression: raise_error(SPARK-52684:VARCHAR)
    Function: setStatus
    File: /root/velox/velox/expression/EvalCtx.cpp
    Line: 184
    Stack trace:
    # 0  _ZN8facebook5velox7process10StackTraceC1Ei
    # 1  
_ZN8facebook5velox14VeloxExceptionC2EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bbNS1_4TypeES7_
    # 2  
_ZN8facebook5velox6detail14veloxCheckFailINS0_14VeloxUserErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_
    # 3  _ZN8facebook5velox4exec7EvalCtx9setStatusEiRKNS0_6StatusE
    # 4  
_ZNK8facebook5velox17SelectivityVector15applyToSelectedIZNS0_4exec7EvalCtx22applyToSelectedNoThrowIZNKS3_21SimpleFunctionAdapterINS0_4core9UDFHolderINS0_9functions8sparksql18RaiseErrorFunctionINS3_10VectorExecEEESC_NS0_12UnknownValueENS0_15ConstantCheckerIJNS0_7VarcharEEEEJSG_EEEE7iterateIJNS3_20ConstantVectorReaderISG_EEEEEvRNSJ_12ApplyContextEDpRT_EUlT_E3_ZNS4_22applyToSelectedNoThrowIST_EEvRKS1_SS_EUlSS_E_EEvSW_SS_T0_EUlSS_E_EEvSS_
    # 5  
_ZNK8facebook5velox4exec21SimpleFunctionAdapterINS0_4core9UDFHolderINS0_9functions8sparksql18RaiseErrorFunctionINS1_10VectorExecEEES8_NS0_12UnknownValueENS0_15ConstantCheckerIJNS0_7VarcharEEEEJSC_EEEE5applyERKNS0_17SelectivityVectorERSt6vectorISt10shared_ptrINS0_10BaseVectorEESaISM_EERKSK_IKNS0_4TypeEERNS1_7EvalCtxERSM_
    # 6  
_ZN8facebook5velox4exec4Expr13applyFunctionERKNS0_17SelectivityVectorERNS1_7EvalCtxERSt10shared_ptrINS0_10BaseVectorEE
    # 7  
_ZN8facebook5velox4exec4Expr19evalFlatNoNullsImplERKNS0_17SelectivityVectorERNS1_7EvalCtxERSt10shared_ptrINS0_10BaseVectorEEPKNS1_7ExprSetE
    # 8  
_ZN8facebook5velox4exec7ExprSet4evalEiibRKNS0_17SelectivityVectorERNS1_7EvalCtxERSt6vectorISt10shared_ptrINS0_10BaseVectorEESaISB_EE
    # 9  
_ZN8facebook5velox4exec13FilterProject7projectERKNS0_17SelectivityVectorERNS1_7EvalCtxE
    # 10 _ZN8facebook5velox4exec13FilterProject9getOutputEv
    # 11 
_ZZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEEENKUlvE7_clEv
    # 12 
_ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE
    # 13 
_ZN8facebook5velox4exec6Driver4nextEPN5folly10SemiFutureINS3_4UnitEEERPNS1_8OperatorERNS1_14BlockingReasonE
    # 14 _ZN8facebook5velox4exec4Task4nextEPN5folly10SemiFutureINS3_4UnitEEE
    # 15 _ZN6gluten24WholeStageResultIterator4nextEv
    # 16 
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeHasNext
    # 17 0x00007bf6fc8459c7
    How was this patch tested?
    UT in CI
    
    Was this patch authored or co-authored using generative AI tooling?
    No
---
 .../org/apache/gluten/utils/velox/VeloxTestSettings.scala     |  2 ++
 .../scala/org/apache/spark/sql/GlutenCachedTableSuite.scala   | 11 ++++++++++-
 .../org/apache/gluten/utils/velox/VeloxTestSettings.scala     |  2 ++
 .../scala/org/apache/spark/sql/GlutenCachedTableSuite.scala   | 11 ++++++++++-
 .../org/apache/gluten/sql/shims/spark40/Spark40Shims.scala    | 11 ++++++++---
 .../org/apache/gluten/sql/shims/spark41/Spark41Shims.scala    | 11 ++++++++---
 6 files changed, 40 insertions(+), 8 deletions(-)

diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 881f0dabb2..ad7796621a 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -865,6 +865,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("InMemoryRelation statistics")
     // Extra ColumnarToRow is needed to transform vanilla columnar data to 
gluten columnar data.
     .exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on 
InMemoryTableScan")
+    // Rewritten because native raise_error throws Spark exception
+    .exclude("SPARK-52684: Atomicity of cache table on error")
   enableSuite[GlutenFileSourceCharVarcharTestSuite]
   enableSuite[GlutenDSV2CharVarcharTestSuite]
   enableSuite[GlutenColumnExpressionSuite]
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
index 0afabae6e5..f438812659 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql
 
 import org.apache.gluten.config.GlutenConfig
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
@@ -36,6 +36,15 @@ class GlutenCachedTableSuite
     super.sparkConf.set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
   }
 
+  testGluten("SPARK-52684: Atomicity of cache table on error") {
+    withTempView("SPARK_52684") {
+      intercept[SparkException] {
+        spark.sql("CACHE TABLE SPARK_52684 AS SELECT 
raise_error('SPARK-52684') AS c1")
+      }
+      assert(!spark.catalog.tableExists("SPARK_52684"))
+    }
+  }
+
   testGluten("InMemoryRelation statistics") {
     sql("CACHE TABLE testData")
     spark.table("testData").queryExecution.withCachedData.collect {
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 10741f7fbc..dd10657ed7 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -833,6 +833,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("InMemoryRelation statistics")
     // Extra ColumnarToRow is needed to transform vanilla columnar data to 
gluten columnar data.
     .exclude("SPARK-37369: Avoid redundant ColumnarToRow transition on 
InMemoryTableScan")
+    // Rewritten because native raise_error throws Spark exception
+    .exclude("SPARK-52684: Atomicity of cache table on error")
   enableSuite[GlutenCacheTableInKryoSuite]
   enableSuite[GlutenFileSourceCharVarcharTestSuite]
   enableSuite[GlutenDSV2CharVarcharTestSuite]
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
index 0afabae6e5..f438812659 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql
 
 import org.apache.gluten.config.GlutenConfig
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
@@ -36,6 +36,15 @@ class GlutenCachedTableSuite
     super.sparkConf.set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
   }
 
+  testGluten("SPARK-52684: Atomicity of cache table on error") {
+    withTempView("SPARK_52684") {
+      intercept[SparkException] {
+        spark.sql("CACHE TABLE SPARK_52684 AS SELECT 
raise_error('SPARK-52684') AS c1")
+      }
+      assert(!spark.catalog.tableExists("SPARK_52684"))
+    }
+  }
+
   testGluten("InMemoryRelation statistics") {
     sql("CACHE TABLE testData")
     spark.table("testData").queryExecution.withCachedData.collect {
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 72c9b27293..f329a18cfc 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
InternalRowComparableWrapper, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
InternalRowComparableWrapper, MapData, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.connector.catalog.Table
@@ -732,8 +732,13 @@ class Spark40Shims extends SparkShims {
           if children.size == 2 && children.head.isInstanceOf[Literal]
             && children.head.asInstanceOf[Literal].value.toString == 
"errorMessage" =>
         Some(children(1))
-      case _ =>
-        None
+      case lit: Literal if lit.value.isInstanceOf[MapData] =>
+        // Constant-folded CreateMap: look up "errorMessage" in the MapData
+        val mapData = lit.value.asInstanceOf[MapData]
+        (0 until mapData.numElements())
+          .find(i => mapData.keyArray().getUTF8String(i).toString == 
"errorMessage")
+          .map(i => Literal(mapData.valueArray().getUTF8String(i), StringType))
+      case _ => None
     }
   }
 
diff --git 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
index 8aad6394f0..f7ae54cc29 100644
--- 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
InternalRowComparableWrapper, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
InternalRowComparableWrapper, MapData, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
@@ -731,8 +731,13 @@ class Spark41Shims extends SparkShims {
           if children.size == 2 && children.head.isInstanceOf[Literal]
             && children.head.asInstanceOf[Literal].value.toString == 
"errorMessage" =>
         Some(children(1))
-      case _ =>
-        None
+      case lit: Literal if lit.value.isInstanceOf[MapData] =>
+        // Constant-folded CreateMap: look up "errorMessage" in the MapData
+        val mapData = lit.value.asInstanceOf[MapData]
+        (0 until mapData.numElements())
+          .find(i => mapData.keyArray().getUTF8String(i).toString == 
"errorMessage")
+          .map(i => Literal(mapData.valueArray().getUTF8String(i), StringType))
+      case _ => None
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to