This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 1eb352d2590d [SPARK-52684][SQL] Make CACHE TABLE Commands atomic while 
encountering execution errors
1eb352d2590d is described below

commit 1eb352d2590d4146341456f193c17c344235e1ab
Author: Kent Yao <y...@apache.org>
AuthorDate: Fri Jul 4 18:13:13 2025 +0800

    [SPARK-52684][SQL] Make CACHE TABLE Commands atomic while encountering 
execution errors
    
    ### What changes were proposed in this pull request?
    
    This PR makes CACHE TABLE commands atomic while encountering execution 
errors
    
    ### Why are the changes needed?
    
    For now, when an AnalysisException occurs, no cache or view will be 
created, but an execution one occurs, a view or an erroneous 'cache' is created.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but it's a bugfix. It only affects rare corner case that a user 
leverages this bug to create an erroneous 'cache'/view for some particular 
purposes
    
    ### How was this patch tested?
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #51374 from yaooqinn/SPARK-52684.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
    (cherry picked from commit 08e43d37f5fd1ae892f9da9a29a402ea0c920e4a)
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../execution/datasources/v2/CacheTableExec.scala  | 27 +++++++++++++++++++---
 .../org/apache/spark/sql/CachedTableSuite.scala    | 11 ++++++++-
 2 files changed, 34 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index c7f47d2eaaaa..2e295200e9ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import java.util.Locale
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.internal.LogKeys.OPTIONS
 import org.apache.spark.internal.MDC
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -28,8 +30,10 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.classic.Dataset
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
-import org.apache.spark.sql.execution.command.CreateViewCommand
+import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.command.{CreateViewCommand, 
DropTempViewCommand}
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 trait BaseCacheTableExec extends LeafV2CommandExec {
   def relationName: String
@@ -53,7 +57,16 @@ trait BaseCacheTableExec extends LeafV2CommandExec {
 
     if (!isLazy) {
       // Performs eager caching.
-      df.count()
+      try {
+        df.count()
+      } catch {
+        case NonFatal(e) =>
+          // If the query fails, we should remove the cached table.
+          Utils.tryLogNonFatalError {
+            session.sharedState.cacheManager.uncacheQuery(session, 
planToCache, cascade = false)
+          }
+          throw e
+      }
     }
 
     Seq.empty
@@ -99,7 +112,15 @@ case class CacheTableAsSelectExec(
       isAnalyzed = true,
       referredTempFunctions = referredTempFunctions
     ).run(session)
-    super.run()
+    try {
+      super.run()
+    } catch {
+      case NonFatal(e) =>
+        Utils.tryLogNonFatalError {
+          DropTempViewCommand(Identifier.of(Array.empty, 
tempViewName)).run(session)
+        }
+        throw e
+    }
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e0ad3feda3ac..0f42502f1d91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -27,7 +27,7 @@ import scala.concurrent.duration._
 
 import org.apache.commons.io.FileUtils
 
-import org.apache.spark.CleanerListener
+import org.apache.spark.{CleanerListener, SparkRuntimeException}
 import org.apache.spark.executor.DataReadMethod._
 import org.apache.spark.executor.DataReadMethod.DataReadMethod
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
@@ -1833,4 +1833,13 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
       }
     }
   }
+
+  test("SPARK-52684: Atomicity of cache table on error") {
+    withTempView("SPARK_52684") {
+      intercept[SparkRuntimeException] {
+        spark.sql("CACHE TABLE SPARK_52684 AS SELECT 
raise_error('SPARK-52684') AS c1")
+      }
+      assert(!spark.catalog.tableExists("SPARK_52684"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to