This is an automated email from the ASF dual-hosted git repository. ashrigondekar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c095b95decc8 [SPARK-52824][SS] CheckpointFileManager error classification c095b95decc8 is described below commit c095b95decc896bb70c74dacd717a7eecb9f19be Author: Livia Zhu <livia....@databricks.com> AuthorDate: Wed Jul 16 19:26:46 2025 -0700 [SPARK-52824][SS] CheckpointFileManager error classification ### What changes were proposed in this pull request? Add new Spark error class for errors thrown from CheckpointFileManager.create. Additionally, unwrap InvocationTargetException error so that the underlying error is top-level. ### Why are the changes needed? Error classification, and users are confused by InvocationTargetException thrown from this class. ### Does this PR introduce _any_ user-facing change? Yes, new error classification ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #51518 from liviazhu/liviazhu-db/checkpointfilemanager. Authored-by: Livia Zhu <livia....@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> --- .../src/main/resources/error/error-conditions.json | 18 +++++++ .../streaming/CheckpointFileManager.scala | 20 ++++++-- .../sql/execution/streaming/StreamingErrors.scala | 42 ++++++++++++++++ .../streaming/CheckpointFileManagerSuite.scala | 57 +++++++++++++++++++++- 4 files changed, 132 insertions(+), 5 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 0a94270dd89f..25d6dc822df3 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -273,6 +273,24 @@ ], "sqlState" : "0A000" }, + "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER" : { + "message" : [ + "Error loading streaming checkpoint file manager for path=<path>." + ], + "subClass" : { + "ERROR_LOADING_CLASS" : { + "message" : [ + "Error instantiating streaming checkpoint file manager for path=<path> with className=<className>. msg=<msg>." + ] + }, + "UNCATEGORIZED" : { + "message" : [ + "" + ] + } + }, + "sqlState" : "58030" + }, "CANNOT_LOAD_FUNCTION_CLASS" : { "message" : [ "Cannot load class <className> when registering the function <functionName>, please make sure it is on the classpath." diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 982cc13c4086..793215fb2456 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming import java.io.{FileNotFoundException, OutputStream} +import java.lang.reflect.InvocationTargetException import java.util.{EnumSet, UUID} import scala.util.control.NonFatal @@ -200,10 +201,19 @@ object CheckpointFileManager extends Logging { val fileManagerClass = hadoopConf.get( SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key) if (fileManagerClass != null) { - return Utils.classForName(fileManagerClass) - .getConstructor(classOf[Path], classOf[Configuration]) - .newInstance(path, hadoopConf) - .asInstanceOf[CheckpointFileManager] + try { + return Utils.classForName(fileManagerClass) + .getConstructor(classOf[Path], classOf[Configuration]) + .newInstance(path, hadoopConf) + .asInstanceOf[CheckpointFileManager] + } catch { + case e: InvocationTargetException if e.getCause != null => + throw StreamingErrors.cannotLoadCheckpointFileManagerClass(path.toString, + fileManagerClass, e.getCause) + case NonFatal(e) => + throw StreamingErrors.cannotLoadCheckpointFileManagerClass(path.toString, + fileManagerClass, e) + } } try { // Try to create a manager based on `FileContext` because HDFS's `FileContext.rename() @@ -218,6 +228,8 @@ object CheckpointFileManager extends Logging { log"the implementation of FileSystem.rename() is not atomic, then the correctness " + log"and fault-tolerance of your Structured Streaming is not guaranteed.") new FileSystemBasedCheckpointFileManager(path, hadoopConf) + case NonFatal(e) => + throw StreamingErrors.cannotLoadCheckpointFileManager(path.toString, e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala new file mode 100644 index 000000000000..98b8832ee2a8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.SparkException + +/** + * Object for grouping error messages from streaming query exceptions + */ +object StreamingErrors { + def cannotLoadCheckpointFileManagerClass(path: String, className: String, err: Throwable): + Throwable = { + new SparkException( + errorClass = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.ERROR_LOADING_CLASS", + messageParameters = Map("path" -> path, "className" -> className, "msg" -> err.toString), + cause = err + ) + } + + def cannotLoadCheckpointFileManager(path: String, err: Throwable): + Throwable = { + new SparkException( + errorClass = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.UNCATEGORIZED", + messageParameters = Map("path" -> path), + cause = err + ) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index cdf736b1fffc..8580aaf69523 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -24,7 +24,7 @@ import scala.util.Random import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream @@ -145,6 +145,56 @@ class CheckpointFileManagerSuite extends SharedSparkSession { } } } + + test("SPARK-52824: CheckpointFileManager.create() does not throw InvocationTargetException") { + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> + classOf[ConstructorFailureTestManager].getName) { + val ex = intercept[SparkException] { + CheckpointFileManager.create(new Path("/"), spark.sessionState.newHadoopConf()) + } + checkError( + ex, + condition = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.ERROR_LOADING_CLASS", + parameters = Map( + "path" -> "/", + "className" -> classOf[ConstructorFailureTestManager].getName, + "msg" -> "java.lang.IllegalStateException: error") + ) + } + } + + test("SPARK-52824: CheckpointFileManager.create() throws uncategorized error") { + val hadoopConf = spark.sessionState.newHadoopConf() + // Set invalid fs.defaultFS to trigger uncategorized error from URI.create + hadoopConf.set("fs.defaultFS", "|invalid/") + val ex = intercept[SparkException] { + CheckpointFileManager.create(new Path("/"), hadoopConf) + } + checkError( + ex, + condition = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.UNCATEGORIZED", + parameters = Map("path" -> "/") + ) + } + + test("SPARK-52824: CheckpointFileManager.create() throws error when class cannot be found") { + withSQLConf( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> + "notarealclass") { + val ex = intercept[SparkException] { + CheckpointFileManager.create(new Path("/"), spark.sessionState.newHadoopConf()) + } + checkError( + ex, + condition = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.ERROR_LOADING_CLASS", + parameters = Map( + "path" -> "/", + "className" -> "notarealclass", + "msg" -> "java.lang.ClassNotFoundException: notarealclass") + ) + } + } } abstract class CheckpointFileManagerTestsOnLocalFs @@ -222,6 +272,11 @@ object CreateAtomicTestManager { @volatile var cancelCalledInCreateAtomic = false } +/** A fake implementation to test constructor failure */ +class ConstructorFailureTestManager(path: Path, hadoopConf: Configuration) + extends FileSystemBasedCheckpointFileManager(path, hadoopConf) { + throw new IllegalStateException("error") +} /** * CheckpointFileManagerSuiteFileSystem to test fallback of the CheckpointFileManager --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org