This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 1eb352d2590d [SPARK-52684][SQL] Make CACHE TABLE Commands atomic while encountering execution errors 1eb352d2590d is described below commit 1eb352d2590d4146341456f193c17c344235e1ab Author: Kent Yao <y...@apache.org> AuthorDate: Fri Jul 4 18:13:13 2025 +0800 [SPARK-52684][SQL] Make CACHE TABLE Commands atomic while encountering execution errors ### What changes were proposed in this pull request? This PR makes CACHE TABLE commands atomic while encountering execution errors ### Why are the changes needed? For now, when an AnalysisException occurs, no cache or view will be created, but an execution one occurs, a view or an erroneous 'cache' is created. ### Does this PR introduce _any_ user-facing change? Yes, but it's a bugfix. It only affects rare corner case that a user leverages this bug to create an erroneous 'cache'/view for some particular purposes ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #51374 from yaooqinn/SPARK-52684. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> (cherry picked from commit 08e43d37f5fd1ae892f9da9a29a402ea0c920e4a) Signed-off-by: Kent Yao <y...@apache.org> --- .../execution/datasources/v2/CacheTableExec.scala | 27 +++++++++++++++++++--- .../org/apache/spark/sql/CachedTableSuite.scala | 11 ++++++++- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index c7f47d2eaaaa..2e295200e9ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.Locale +import scala.util.control.NonFatal + import org.apache.spark.internal.LogKeys.OPTIONS import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} @@ -28,8 +30,10 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.classic.Dataset import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper -import org.apache.spark.sql.execution.command.CreateViewCommand +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.execution.command.{CreateViewCommand, DropTempViewCommand} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils trait BaseCacheTableExec extends LeafV2CommandExec { def relationName: String @@ -53,7 +57,16 @@ trait BaseCacheTableExec extends LeafV2CommandExec { if (!isLazy) { // Performs eager caching. - df.count() + try { + df.count() + } catch { + case NonFatal(e) => + // If the query fails, we should remove the cached table. + Utils.tryLogNonFatalError { + session.sharedState.cacheManager.uncacheQuery(session, planToCache, cascade = false) + } + throw e + } } Seq.empty @@ -99,7 +112,15 @@ case class CacheTableAsSelectExec( isAnalyzed = true, referredTempFunctions = referredTempFunctions ).run(session) - super.run() + try { + super.run() + } catch { + case NonFatal(e) => + Utils.tryLogNonFatalError { + DropTempViewCommand(Identifier.of(Array.empty, tempViewName)).run(session) + } + throw e + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index e0ad3feda3ac..0f42502f1d91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -27,7 +27,7 @@ import scala.concurrent.duration._ import org.apache.commons.io.FileUtils -import org.apache.spark.CleanerListener +import org.apache.spark.{CleanerListener, SparkRuntimeException} import org.apache.spark.executor.DataReadMethod._ import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} @@ -1833,4 +1833,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } } } + + test("SPARK-52684: Atomicity of cache table on error") { + withTempView("SPARK_52684") { + intercept[SparkRuntimeException] { + spark.sql("CACHE TABLE SPARK_52684 AS SELECT raise_error('SPARK-52684') AS c1") + } + assert(!spark.catalog.tableExists("SPARK_52684")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org