This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d68cde812c6f [SPARK-48991][SQL] Move path initialization into
try-catch block in FileStreamSink.hasMetadata
d68cde812c6f is described below
commit d68cde812c6f904d6f01b7fde1eed10b12edd766
Author: Kent Yao <[email protected]>
AuthorDate: Wed Jul 24 19:47:29 2024 +0800
[SPARK-48991][SQL] Move path initialization into try-catch block in
FileStreamSink.hasMetadata
### What changes were proposed in this pull request?
This pull request proposed to move path initialization into try-catch block
in FileStreamSink.hasMetadata. Then, exceptions from invalid paths can be
handled consistently like other path-related exceptions in the current
try-catch block. At last, we can make the errors fall into the correct code
branches to be handled
### Why are the changes needed?
bugfix for improperly handled exceptions in FileStreamSink.hasMetadata
### Does this PR introduce _any_ user-facing change?
no, an invalid path is still invalid, but fails in the correct places
### How was this patch tested?
new test
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #47471 from yaooqinn/SPARK-48991.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../spark/sql/execution/streaming/FileStreamSink.scala | 2 +-
.../apache/spark/sql/streaming/FileStreamSinkSuite.scala | 13 +++++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 638da08d0fd9..d561ee1ef730 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -49,8 +49,8 @@ object FileStreamSink extends Logging {
path match {
case Seq(singlePath) =>
- val hdfsPath = new Path(singlePath)
try {
+ val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.getFileStatus(hdfsPath).isDirectory) {
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 04193d5189ae..2e80588fb282 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -651,6 +651,19 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}
}
+
+ test("SPARK-48991: Move path initialization into try-catch block") {
+ val logAppender = new LogAppender("Assume no metadata directory.")
+ Seq(null, "", "file:tmp").foreach { path =>
+ withLogAppender(logAppender) {
+ assert(!FileStreamSink.hasMetadata(Seq(path),
spark.sessionState.newHadoopConf(), conf))
+ }
+
+
assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
+ "Assume no metadata directory. Error while looking for metadata
directory in the path:" +
+ s" $path."))
+ }
+ }
}
object PendingCommitFilesTrackingManifestFileCommitProtocol {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]