This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c095b95decc8 [SPARK-52824][SS] CheckpointFileManager error 
classification
c095b95decc8 is described below

commit c095b95decc896bb70c74dacd717a7eecb9f19be
Author: Livia Zhu <livia....@databricks.com>
AuthorDate: Wed Jul 16 19:26:46 2025 -0700

    [SPARK-52824][SS] CheckpointFileManager error classification
    
    ### What changes were proposed in this pull request?
    
    Add new Spark error class for errors thrown from 
CheckpointFileManager.create. Additionally, unwrap InvocationTargetException 
error so that the underlying error is top-level.
    
    ### Why are the changes needed?
    
    Error classification, and users are confused by InvocationTargetException 
thrown from this class.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, new error classification
    
    ### How was this patch tested?
    
    New unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51518 from liviazhu/liviazhu-db/checkpointfilemanager.
    
    Authored-by: Livia Zhu <livia....@databricks.com>
    Signed-off-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
---
 .../src/main/resources/error/error-conditions.json | 18 +++++++
 .../streaming/CheckpointFileManager.scala          | 20 ++++++--
 .../sql/execution/streaming/StreamingErrors.scala  | 42 ++++++++++++++++
 .../streaming/CheckpointFileManagerSuite.scala     | 57 +++++++++++++++++++++-
 4 files changed, 132 insertions(+), 5 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 0a94270dd89f..25d6dc822df3 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -273,6 +273,24 @@
     ],
     "sqlState" : "0A000"
   },
+  "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER" : {
+    "message" : [
+      "Error loading streaming checkpoint file manager for path=<path>."
+    ],
+    "subClass" : {
+      "ERROR_LOADING_CLASS" : {
+        "message" : [
+          "Error instantiating streaming checkpoint file manager for 
path=<path> with className=<className>. msg=<msg>."
+        ]
+      },
+      "UNCATEGORIZED" : {
+        "message" : [
+          ""
+        ]
+      }
+    },
+    "sqlState" : "58030"
+  },
   "CANNOT_LOAD_FUNCTION_CLASS" : {
     "message" : [
       "Cannot load class <className> when registering the function 
<functionName>, please make sure it is on the classpath."
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 982cc13c4086..793215fb2456 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.io.{FileNotFoundException, OutputStream}
+import java.lang.reflect.InvocationTargetException
 import java.util.{EnumSet, UUID}
 
 import scala.util.control.NonFatal
@@ -200,10 +201,19 @@ object CheckpointFileManager extends Logging {
     val fileManagerClass = hadoopConf.get(
       SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key)
     if (fileManagerClass != null) {
-      return Utils.classForName(fileManagerClass)
-        .getConstructor(classOf[Path], classOf[Configuration])
-        .newInstance(path, hadoopConf)
-        .asInstanceOf[CheckpointFileManager]
+      try {
+        return Utils.classForName(fileManagerClass)
+          .getConstructor(classOf[Path], classOf[Configuration])
+          .newInstance(path, hadoopConf)
+          .asInstanceOf[CheckpointFileManager]
+      } catch {
+        case e: InvocationTargetException if e.getCause != null =>
+          throw 
StreamingErrors.cannotLoadCheckpointFileManagerClass(path.toString,
+            fileManagerClass, e.getCause)
+        case NonFatal(e) =>
+          throw 
StreamingErrors.cannotLoadCheckpointFileManagerClass(path.toString,
+              fileManagerClass, e)
+      }
     }
     try {
       // Try to create a manager based on `FileContext` because HDFS's 
`FileContext.rename()
@@ -218,6 +228,8 @@ object CheckpointFileManager extends Logging {
             log"the implementation of FileSystem.rename() is not atomic, then 
the correctness " +
             log"and fault-tolerance of your Structured Streaming is not 
guaranteed.")
         new FileSystemBasedCheckpointFileManager(path, hadoopConf)
+      case NonFatal(e) =>
+        throw StreamingErrors.cannotLoadCheckpointFileManager(path.toString, e)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala
new file mode 100644
index 000000000000..98b8832ee2a8
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingErrors.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.SparkException
+
+/**
+ * Object for grouping error messages from streaming query exceptions
+ */
+object StreamingErrors {
+  def cannotLoadCheckpointFileManagerClass(path: String, className: String, 
err: Throwable):
+  Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.ERROR_LOADING_CLASS",
+      messageParameters = Map("path" -> path, "className" -> className, "msg" 
-> err.toString),
+      cause = err
+    )
+  }
+
+  def cannotLoadCheckpointFileManager(path: String, err: Throwable):
+  Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.UNCATEGORIZED",
+      messageParameters = Map("path" -> path),
+      cause = err
+    )
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
index cdf736b1fffc..8580aaf69523 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
@@ -24,7 +24,7 @@ import scala.util.Random
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util.quietly
 import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
@@ -145,6 +145,56 @@ class CheckpointFileManagerSuite extends 
SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-52824: CheckpointFileManager.create() does not throw 
InvocationTargetException") {
+    withSQLConf(
+      SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
+        classOf[ConstructorFailureTestManager].getName) {
+      val ex = intercept[SparkException] {
+        CheckpointFileManager.create(new Path("/"), 
spark.sessionState.newHadoopConf())
+      }
+      checkError(
+        ex,
+        condition = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.ERROR_LOADING_CLASS",
+        parameters = Map(
+          "path" -> "/",
+          "className" -> classOf[ConstructorFailureTestManager].getName,
+          "msg" -> "java.lang.IllegalStateException: error")
+      )
+    }
+  }
+
+  test("SPARK-52824: CheckpointFileManager.create() throws uncategorized 
error") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    // Set invalid fs.defaultFS to trigger uncategorized error from URI.create
+    hadoopConf.set("fs.defaultFS", "|invalid/")
+    val ex = intercept[SparkException] {
+      CheckpointFileManager.create(new Path("/"), hadoopConf)
+    }
+    checkError(
+      ex,
+      condition = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.UNCATEGORIZED",
+      parameters = Map("path" -> "/")
+    )
+  }
+
+  test("SPARK-52824: CheckpointFileManager.create() throws error when class 
cannot be found") {
+    withSQLConf(
+      SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key ->
+        "notarealclass") {
+      val ex = intercept[SparkException] {
+        CheckpointFileManager.create(new Path("/"), 
spark.sessionState.newHadoopConf())
+      }
+      checkError(
+        ex,
+        condition = "CANNOT_LOAD_CHECKPOINT_FILE_MANAGER.ERROR_LOADING_CLASS",
+        parameters = Map(
+          "path" -> "/",
+          "className" -> "notarealclass",
+          "msg" -> "java.lang.ClassNotFoundException: notarealclass")
+      )
+    }
+  }
 }
 
 abstract class CheckpointFileManagerTestsOnLocalFs
@@ -222,6 +272,11 @@ object CreateAtomicTestManager {
   @volatile var cancelCalledInCreateAtomic = false
 }
 
+/** A fake implementation to test constructor failure */
+class ConstructorFailureTestManager(path: Path, hadoopConf: Configuration)
+  extends FileSystemBasedCheckpointFileManager(path, hadoopConf) {
+  throw new IllegalStateException("error")
+}
 
 /**
  * CheckpointFileManagerSuiteFileSystem to test fallback of the 
CheckpointFileManager


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to