This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a6632ffa16f6 [SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser a6632ffa16f6 is described below commit a6632ffa16f6907eba96e745920d571924bf4b63 Author: Vladimir Golubev <vladimir.golu...@databricks.com> AuthorDate: Sat May 11 00:37:54 2024 +0800 [SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser # What changes were proposed in this pull request? New lightweight exception for control-flow between UnivocityParser and FalureSafeParser to speed-up malformed CSV parsing. This is a different way to implement these reverted changes: https://github.com/apache/spark/pull/46478 The previous implementation was more invasive - removing `cause` from `BadRecordException` could break upper code, which unwraps errors and checks the types of the causes. This implementation only touches `FailureSafeParser` and `UnivocityParser` since in the codebase they are always used together, unlike `JacksonParser` and `StaxXmlParser`. Removing stacktrace from `BadRecordException` is safe, since the cause itself has an adequate stacktrace (except pure control-flow cases). ### Why are the changes needed? Parsing in `PermissiveMode` is slow due to heavy exception construction (stacktrace filling + string template substitution in `SparkRuntimeException`) ### Does this PR introduce _any_ user-facing change? No, since `FailureSafeParser` unwraps `BadRecordException` and correctly rethrows user-facing exceptions in `FailFastMode` ### How was this patch tested? - `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite` - Manually run csv benchmark - Manually checked correct and malformed csv in sherk-shell (org.apache.spark.SparkException is thrown with the stacktrace) ### Was this patch authored or co-authored using generative AI tooling? No Closes #46500 from vladimirg-db/vladimirg-db/use-special-lighweight-exception-for-control-flow-between-univocity-parser-and-failure-safe-parser. Authored-by: Vladimir Golubev <vladimir.golu...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 5 +++-- .../sql/catalyst/util/BadRecordException.scala | 22 +++++++++++++++++++--- .../sql/catalyst/util/FailureSafeParser.scala | 11 +++++++++-- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index a5158d8a22c6..4d95097e1681 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -316,7 +316,7 @@ class UnivocityParser( throw BadRecordException( () => getCurrentInput, () => Array.empty, - QueryExecutionErrors.malformedCSVRecordError("")) + LazyBadRecordCauseWrapper(() => QueryExecutionErrors.malformedCSVRecordError(""))) } val currentInput = getCurrentInput @@ -326,7 +326,8 @@ class UnivocityParser( // However, we still have chance to parse some of the tokens. It continues to parses the // tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing // tokens. - Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)) + Some(LazyBadRecordCauseWrapper( + () => QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))) } else None // When the length of the returned tokens is identical to the length of the parsed schema, // we just need to: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index 65a56c1064e4..654b0b8c73e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -67,16 +67,32 @@ case class PartialResultArrayException( extends Exception(cause) /** - * Exception thrown when the underlying parser meet a bad record and can't parse it. + * Exception thrown when the underlying parser met a bad record and can't parse it. + * The stacktrace is not collected for better preformance, and thus, this exception should + * not be used in a user-facing context. * @param record a function to return the record that cause the parser to fail * @param partialResults a function that returns an row array, which is the partial results of * parsing this bad record. - * @param cause the actual exception about why the record is bad and can't be parsed. + * @param cause the actual exception about why the record is bad and can't be parsed. It's better + * to use `LazyBadRecordCauseWrapper` here to delay heavy cause construction + * until it's needed. */ case class BadRecordException( @transient record: () => UTF8String, @transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow], - cause: Throwable) extends Exception(cause) + cause: Throwable) extends Exception(cause) { + override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0) + override def fillInStackTrace(): Throwable = this +} + +/** + * Exception to use as `BadRecordException` cause to delay heavy user-facing exception construction. + * Does not contain stacktrace and used only for control flow + */ +case class LazyBadRecordCauseWrapper(cause: () => Throwable) extends Exception() { + override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0) + override def fillInStackTrace(): Throwable = this +} /** * Exception thrown when the underlying parser parses a JSON array as a struct. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index 10cd159c769b..d9946d1b12ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -78,10 +78,17 @@ class FailureSafeParser[IN]( case StringAsDataTypeException(fieldName, fieldValue, dataType) => throw QueryExecutionErrors.cannotParseStringAsDataTypeError(e.record().toString, fieldName, fieldValue, dataType) - case other => throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( - toResultRow(e.partialResults().headOption, e.record).toString, other) + case causeWrapper: LazyBadRecordCauseWrapper => + throwMalformedRecordsDetectedInRecordParsingError(e, causeWrapper.cause()) + case cause => throwMalformedRecordsDetectedInRecordParsingError(e, cause) } } } } + + private def throwMalformedRecordsDetectedInRecordParsingError( + e: BadRecordException, cause: Throwable): Nothing = { + throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( + toResultRow(e.partialResults().headOption, e.record).toString, cause) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org