This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3bdcf90a7f54 [SPARK-53123][CORE][SQL] Support `getRootCause` in `SparkErrorUtils` 3bdcf90a7f54 is described below commit 3bdcf90a7f543dc9392116b0407286c52d93a699 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Aug 5 19:29:16 2025 +0800 [SPARK-53123][CORE][SQL] Support `getRootCause` in `SparkErrorUtils` ### What changes were proposed in this pull request? This PR aims to support `getRootCause` in `SparkErrorUtils`. ### Why are the changes needed? To improve Spark's error utility functions. ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? `SparkErrorUtilsSuite` has been newly added, and the new test cases are refer to https://github.com/apache/commons-lang/blob/5b59487808d0b86a94ad7a1ebd4c200e09b7be09/src/test/java/org/apache/commons/lang3/exception/ExceptionUtilsTest.java#L334-L341 ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51842 from LuciferYang/SPARK-53123. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../org/apache/spark/util/SparkErrorUtils.scala | 39 ++++++++ .../apache/spark/util/SparkErrorUtilsSuite.scala | 102 +++++++++++++++++++++ scalastyle-config.xml | 5 + .../spark/sql/catalyst/xml/StaxXmlParser.scala | 4 +- .../execution/datasources/xml/XmlDataSource.scala | 3 +- .../sql/execution/datasources/json/JsonSuite.scala | 3 +- .../sql/execution/datasources/xml/XmlSuite.scala | 12 +-- .../thriftserver/ThriftServerQueryTestSuite.scala | 4 +- 8 files changed, 157 insertions(+), 15 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala index 40b9fe3cda1f..c2b56fe85b7f 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.util import java.io.{Closeable, IOException, PrintWriter} import java.nio.charset.StandardCharsets.UTF_8 +import scala.annotation.tailrec +import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.internal.{Logging, LogKeys, MDC} @@ -106,6 +108,43 @@ private[spark] trait SparkErrorUtils extends Logging { new String(out.toByteArray, UTF_8) } + /** + * Walks the [[Throwable]] to obtain its root cause. + * + * This method walks through the exception chain until the last element, + * the root cause of the chain, using `getCause()`, and + * returns that exception. + * + * This method handles recursive cause chains that might + * otherwise cause infinite loops. The cause chain is processed until + * the end, or until the next item in the chain is already + * processed. If we detect a loop, then return the element before the loop. + * + * @param throwable the throwable to get the root cause for, may be null + * @return the root cause of the [[Throwable]], `null` if null throwable input + */ + def getRootCause(throwable: Throwable): Throwable = { + @tailrec + def findRoot( + current: Throwable, + visited: mutable.Set[Throwable] = mutable.Set.empty): Throwable = { + if (current == null) null + else { + visited += current + val cause = current.getCause + if (cause == null) { + current + } else if (visited.contains(cause)) { + current + } else { + findRoot(cause, visited) + } + } + } + + findRoot(throwable) + } + /** Try to close by ignoring all exceptions. This is different from JavaUtils.closeQuietly. */ def closeQuietly(closeable: Closeable): Unit = { if (closeable != null) { diff --git a/common/utils/src/test/scala/org/apache/spark/util/SparkErrorUtilsSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/SparkErrorUtilsSuite.scala new file mode 100644 index 000000000000..ad1a005d8baa --- /dev/null +++ b/common/utils/src/test/scala/org/apache/spark/util/SparkErrorUtilsSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.annotation.nowarn + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +class SparkErrorUtilsSuite + extends AnyFunSuite // scalastyle:ignore funsuite + with BeforeAndAfterEach { + + private var withoutCause: Throwable = _ + private var withCause: Throwable = _ + private var jdkNoCause: Throwable = _ + private var nested: NestableException = _ + private var cyclicCause: ExceptionWithCause = _ + + override def beforeEach(): Unit = { + withoutCause = createExceptionWithoutCause + nested = new NestableException(withoutCause) + withCause = new ExceptionWithCause(nested) + jdkNoCause = new NullPointerException + val exceptionA = new ExceptionWithCause(null.asInstanceOf[Throwable]) + val exceptionB = new ExceptionWithCause(exceptionA) + exceptionA.setCauseValue(exceptionB) + cyclicCause = new ExceptionWithCause(exceptionA) + } + + override def afterEach(): Unit = { + withoutCause = null + nested = null + withCause = null + jdkNoCause = null + cyclicCause = null + } + + test("getRootCause") { + assert(SparkErrorUtils.getRootCause(null) == null) + assert(SparkErrorUtils.getRootCause(withoutCause) == withoutCause) + assert(SparkErrorUtils.getRootCause(nested) == withoutCause) + assert(SparkErrorUtils.getRootCause(withCause) == withoutCause) + assert(SparkErrorUtils.getRootCause(jdkNoCause) == jdkNoCause) + assert(SparkErrorUtils.getRootCause(cyclicCause) == cyclicCause.getCause.getCause) + } + + private def createExceptionWithoutCause: Throwable = + try throw new ExceptionWithoutCause + catch { + case t: Throwable => t + } + + private final class ExceptionWithoutCause extends Exception { + @nowarn def getTargetException(): Unit = {} + } + + private final class NestableException extends Exception { + def this(t: Throwable) = { + this() + initCause(t) + } + } + + private final class ExceptionWithCause(message: String = null) extends Exception(message) { + + private var _cause: Throwable = _ + + def this(message: String, cause: Throwable) = { + this(message) + this._cause = cause + } + + def this(cause: Throwable) = { + this(null.asInstanceOf[String]) + this._cause = cause + } + + override def getCause: Throwable = synchronized { + _cause + } + + def setCauseValue(cause: Throwable): Unit = { + this._cause = cause + } + } +} diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 8c45cad0ec71..2dec5468c51b 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -414,6 +414,11 @@ This file is divided into 3 sections: <customMessage>Use Utils.is(Not)?(Blank|Empty) instead</customMessage> </check> + <check customId="commonslang3getrootcause" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">\bExceptionUtils\.getRootCause\b</parameter></parameters> + <customMessage>Use getRootCause of SparkErrorUtils or Utils instead</customMessage> + </check> + <check customId="commonslang3strings" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">org\.apache\.commons\.lang3\.Strings\b</parameter></parameters> <customMessage>Use Java String methods instead</customMessage> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala index e3efc362cd92..13531a63d5df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala @@ -33,7 +33,6 @@ import scala.util.control.Exception.allCatch import scala.util.control.NonFatal import scala.xml.SAXException -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hdfs.BlockMissingException import org.apache.hadoop.security.AccessControlException @@ -51,6 +50,7 @@ import org.apache.spark.types.variant.{Variant, VariantBuilder} import org.apache.spark.types.variant.VariantBuilder.FieldEntry import org.apache.spark.types.variant.VariantUtil import org.apache.spark.unsafe.types.{UTF8String, VariantVal} +import org.apache.spark.util.Utils class StaxXmlParser( schema: StructType, @@ -687,7 +687,7 @@ class XmlTokenizer( " the content in the missing file during schema inference", e) case NonFatal(e) => - ExceptionUtils.getRootCause(e) match { + Utils.getRootCause(e) match { case _: AccessControlException | _: BlockMissingException => reader.close() reader = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala index 23bca3572539..287ed088a75e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataSource.scala @@ -22,7 +22,6 @@ import java.nio.charset.{Charset, StandardCharsets} import scala.util.control.NonFatal -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hdfs.BlockMissingException @@ -199,7 +198,7 @@ object MultiLineXmlDataSource extends XmlDataSource { logWarning("Skipped missing file", e) Iterator.empty[String] case NonFatal(e) => - ExceptionUtils.getRootCause(e) match { + Utils.getRootCause(e) match { case e @ (_ : AccessControlException | _ : BlockMissingException) => throw e case _: RuntimeException | _: IOException if parsedOptions.ignoreCorruptFiles => logWarning("Skipped the rest of the content in the corrupted file", e) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3de2d9cce1b7..70cebd9c9ce4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -26,7 +26,6 @@ import java.util.Locale import java.util.concurrent.atomic.AtomicLong import com.fasterxml.jackson.core.JsonFactory -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.{CompressionCodecFactory, GzipCodec} @@ -3372,7 +3371,7 @@ abstract class JsonSuite ) checkError( - exception = ExceptionUtils.getRootCause(exception).asInstanceOf[SparkRuntimeException], + exception = Utils.getRootCause(exception).asInstanceOf[SparkRuntimeException], condition = "INVALID_JSON_ROOT_FIELD", parameters = Map.empty ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala index fdc8819658bb..fcc80791f060 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala @@ -30,7 +30,6 @@ import scala.collection.mutable import scala.io.Source import scala.jdk.CollectionConverters._ -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FSDataInputStream import org.apache.hadoop.io.{LongWritable, Text} @@ -52,6 +51,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils class XmlSuite extends QueryTest @@ -215,7 +215,7 @@ class XmlSuite .select("year") .collect() } - ExceptionUtils.getRootCause(exception).isInstanceOf[UnsupportedCharsetException] + Utils.getRootCause(exception).isInstanceOf[UnsupportedCharsetException] assert(exception.getMessage.contains("1-9588-osi")) } @@ -2917,8 +2917,8 @@ class XmlSuite .xml(inputFile.toURI.toString) .collect() } - assert(ExceptionUtils.getRootCause(e).isInstanceOf[EOFException]) - assert(ExceptionUtils.getRootCause(e).getMessage === "Unexpected end of input stream") + assert(Utils.getRootCause(e).isInstanceOf[EOFException]) + assert(Utils.getRootCause(e).getMessage === "Unexpected end of input stream") val e2 = intercept[SparkException] { spark.read .option("rowTag", "ROW") @@ -2926,8 +2926,8 @@ class XmlSuite .xml(inputFile.toURI.toString) .collect() } - assert(ExceptionUtils.getRootCause(e2).isInstanceOf[EOFException]) - assert(ExceptionUtils.getRootCause(e2).getMessage === "Unexpected end of input stream") + assert(Utils.getRootCause(e2).isInstanceOf[EOFException]) + assert(Utils.getRootCause(e2).getMessage === "Unexpected end of input stream") } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { val result = spark.read diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala index 0012451dcf5e..9f8b3b1686a9 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerQueryTestSuite.scala @@ -23,8 +23,6 @@ import java.util.{Locale, MissingFormatArgumentException} import scala.util.control.NonFatal -import org.apache.commons.lang3.exception.ExceptionUtils - import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLQueryTestSuite @@ -308,7 +306,7 @@ class ThriftServerQueryTestSuite extends SQLQueryTestSuite with SharedThriftServ try { result } catch { - case NonFatal(e) => throw ExceptionUtils.getRootCause(e) + case NonFatal(e) => throw Utils.getRootCause(e) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org