This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new ea11e50114b0 [SPARK-55495][CORE] Fix `EventLogFileWriters.closeWriter`
to handle `checkError`
ea11e50114b0 is described below
commit ea11e50114b058c76dcb18d25170a52039c37784
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Feb 12 11:07:41 2026 -0800
[SPARK-55495][CORE] Fix `EventLogFileWriters.closeWriter` to handle
`checkError`
### What changes were proposed in this pull request?
This PR aims to fix `EventLogFileWriters.closeWriter` to handle
`checkError`. In general, we need the following three.
1. Do `flush` first before closing to isolate any problems at this layer.
2. Do `PrintWriter.close` and fallback to the underlying Hadoop file
stream's `close` API.
3. Show warnings properly if `checkError` returns true.
### Why are the changes needed?
Currently, Apache Spark's event log writer naively invokes
`PrintWriter.close()` without error handling.
https://github.com/apache/spark/blob/4e1cb88bba0c031f54dd07e3adc0d464d45cbfce/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala#L80
https://github.com/apache/spark/blob/4e1cb88bba0c031f54dd07e3adc0d464d45cbfce/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala#L133-L135
However, Java community recommends to use `checkError` in case of
`PrintWriter.flush` and `PrintWriter.close`.
-
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/io/PrintWriter.html#checkError()
When `checkError` returns `true`, a user can lose their event log. For
example, the event log is not uploaded silently. Spark had better show a proper
warning and tries to do the best efforts to flush or close the underlying
Hadoop File streams at least.
### Does this PR introduce _any_ user-facing change?
No, this is a bug fix for the corner case.
### How was this patch tested?
Pass the CIs with the newly added test cases.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: `Opus 4.5` on `Claude Code`
Closes #54280 from dongjoon-hyun/SPARK-55495.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 3484a4acf13aafb05f49a51a2adef9c38d5daacc)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/deploy/history/EventLogFileWriters.scala | 13 ++
.../deploy/history/EventLogFileWritersSuite.scala | 146 ++++++++++++++++++++-
2 files changed, 158 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
index 4e3bee1015ff..7c022c283db4 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
@@ -131,7 +131,20 @@ abstract class EventLogFileWriter(
}
protected def closeWriter(): Unit = {
+ // 1. Flush first to check the errors
+ writer.foreach(_.flush())
+ if (writer.exists(_.checkError())) {
+ logError("Spark detects errors while flushing event logs.")
+ }
+ hadoopDataStream.foreach(_.hflush())
+
+ // 2. Try to close and check the errors
writer.foreach(_.close())
+ if (writer.exists(_.checkError())) {
+ logError("Spark detects errors while closing event logs.")
+ // 3. Ensuring the underlying stream is closed at least (best-effort).
+ hadoopDataStream.foreach(_.close())
+ }
}
protected def renameFile(src: Path, dest: Path, overwrite: Boolean): Unit = {
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
index d9d6a4f8d35d..00a92c503be4 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.history
-import java.io.{File, FileOutputStream, IOException}
+import java.io.{File, FileOutputStream, IOException, OutputStream, PrintWriter}
import java.net.URI
import scala.collection.mutable
@@ -160,8 +160,152 @@ abstract class EventLogFileWritersSuite extends
SparkFunSuite with LocalSparkCon
expectedLines: Seq[String] = Seq.empty): Unit
}
+/**
+ * A test OutputStream that simulates IO errors.
+ */
+class ErrorThrowingOutputStream extends OutputStream {
+ var throwOnWrite: Boolean = false
+ var throwOnFlush: Boolean = false
+ var throwOnClose: Boolean = false
+
+ override def write(b: Int): Unit = {
+ if (throwOnWrite) {
+ throw new IOException("Simulated write error")
+ }
+ }
+
+ override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+ if (throwOnWrite) {
+ throw new IOException("Simulated write error")
+ }
+ }
+
+ override def flush(): Unit = {
+ if (throwOnFlush) {
+ throw new IOException("Simulated flush error")
+ }
+ }
+
+ override def close(): Unit = {
+ if (throwOnClose) {
+ throw new IOException("Simulated close error")
+ }
+ }
+}
+
+/**
+ * A testable subclass of SingleEventLogFileWriter that exposes the writer
field
+ * and closeWriter method for testing.
+ */
+class TestableSingleEventLogFileWriter(
+ appId: String,
+ appAttemptId: Option[String],
+ logBaseDir: URI,
+ sparkConf: SparkConf,
+ hadoopConf: Configuration)
+ extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf,
hadoopConf) {
+
+ def setWriterForTest(pw: PrintWriter): Unit = {
+ writer = Some(pw)
+ }
+
+ def callCloseWriter(): Unit = {
+ closeWriter()
+ }
+}
+
class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite {
+ test("SPARK-55495: closeWriter should log warning when flush error occurs") {
+ val appId = getUniqueApplicationId
+ val attemptId = None
+ val conf = getLoggingConf(testDirPath, None)
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+ val writer = new TestableSingleEventLogFileWriter(
+ appId, attemptId, testDirPath.toUri, conf, hadoopConf)
+
+ // Create a PrintWriter with an ErrorThrowingOutputStream
+ val errorStream = new ErrorThrowingOutputStream()
+ val printWriter = new PrintWriter(errorStream)
+
+ // Simulate an error by writing to a closed stream that causes checkError
to return true
+ errorStream.throwOnWrite = true
+ // scalastyle:off println
+ printWriter.println("test") // This will set the error flag
+ // scalastyle:on println
+
+ writer.setWriterForTest(printWriter)
+
+ val logAppender = new LogAppender("closeWriter flush error test")
+ withLogAppender(logAppender, level =
Some(org.apache.logging.log4j.Level.WARN)) {
+ writer.callCloseWriter()
+ }
+
+ val warningMessages =
logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+ assert(warningMessages.exists(_.contains("Spark detects errors while
flushing")),
+ s"Expected warning message not found. Messages: $warningMessages")
+ }
+
+ test("SPARK-55495: closeWriter should log warning when close error occurs") {
+ val appId = getUniqueApplicationId
+ val attemptId = None
+ val conf = getLoggingConf(testDirPath, None)
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+ val writer = new TestableSingleEventLogFileWriter(
+ appId, attemptId, testDirPath.toUri, conf, hadoopConf)
+
+ // Create a PrintWriter with an ErrorThrowingOutputStream that errors on
close
+ val errorStream = new ErrorThrowingOutputStream()
+ val printWriter = new PrintWriter(errorStream)
+
+ // First write something successfully
+ // scalastyle:off println
+ printWriter.println("test")
+ // scalastyle:on println
+ printWriter.flush()
+
+ // Now set up to error on close
+ errorStream.throwOnClose = true
+
+ writer.setWriterForTest(printWriter)
+
+ val logAppender = new LogAppender("closeWriter close error test")
+ withLogAppender(logAppender, level =
Some(org.apache.logging.log4j.Level.WARN)) {
+ writer.callCloseWriter()
+ }
+
+ val warningMessages =
logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+ assert(warningMessages.exists(_.contains("Spark detects errors while
closing")),
+ s"Expected warning message not found. Messages: $warningMessages")
+ }
+
+ test("SPARK-55495: closeWriter should complete without warnings when no
errors") {
+ val appId = getUniqueApplicationId
+ val attemptId = None
+ val conf = getLoggingConf(testDirPath, None)
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+ val writer = new TestableSingleEventLogFileWriter(
+ appId, attemptId, testDirPath.toUri, conf, hadoopConf)
+
+ // Create a normal PrintWriter with no errors
+ val normalStream = new ErrorThrowingOutputStream()
+ val printWriter = new PrintWriter(normalStream)
+
+ writer.setWriterForTest(printWriter)
+
+ val logAppender = new LogAppender("closeWriter no error test")
+ withLogAppender(logAppender, level =
Some(org.apache.logging.log4j.Level.WARN)) {
+ writer.callCloseWriter()
+ }
+
+ val warningMessages =
logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+ assert(!warningMessages.exists(_.contains("Spark detects errors")),
+ s"Unexpected warning message found. Messages: $warningMessages")
+ }
+
test("Log overwriting") {
val appId = "test"
val appAttemptId = None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]