This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new ea11e50114b0 [SPARK-55495][CORE] Fix `EventLogFileWriters.closeWriter` 
to handle `checkError`
ea11e50114b0 is described below

commit ea11e50114b058c76dcb18d25170a52039c37784
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Feb 12 11:07:41 2026 -0800

    [SPARK-55495][CORE] Fix `EventLogFileWriters.closeWriter` to handle 
`checkError`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix `EventLogFileWriters.closeWriter` to handle 
`checkError`. In general, we need the following three.
    
    1. Do `flush` first before closing to isolate any problems at this layer.
    2. Do `PrintWriter.close` and fallback to the underlying Hadoop file 
stream's `close` API.
    3. Show warnings properly if `checkError` returns true.
    
    ### Why are the changes needed?
    
    Currently, Apache Spark's event log writer naively invokes 
`PrintWriter.close()` without error handling.
    
    
https://github.com/apache/spark/blob/4e1cb88bba0c031f54dd07e3adc0d464d45cbfce/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala#L80
    
    
https://github.com/apache/spark/blob/4e1cb88bba0c031f54dd07e3adc0d464d45cbfce/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala#L133-L135
    
    However, Java community recommends to use `checkError` in case of 
`PrintWriter.flush` and `PrintWriter.close`.
    - 
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/io/PrintWriter.html#checkError()
    
    When `checkError` returns `true`, a user can lose their event log. For 
example, the event log is not uploaded silently. Spark had better show a proper 
warning and tries to do the best efforts to flush or close the underlying 
Hadoop File streams at least.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a bug fix for the corner case.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: `Opus 4.5` on `Claude Code`
    
    Closes #54280 from dongjoon-hyun/SPARK-55495.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 3484a4acf13aafb05f49a51a2adef9c38d5daacc)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/deploy/history/EventLogFileWriters.scala |  13 ++
 .../deploy/history/EventLogFileWritersSuite.scala  | 146 ++++++++++++++++++++-
 2 files changed, 158 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
index 4e3bee1015ff..7c022c283db4 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
@@ -131,7 +131,20 @@ abstract class EventLogFileWriter(
   }
 
   protected def closeWriter(): Unit = {
+    // 1. Flush first to check the errors
+    writer.foreach(_.flush())
+    if (writer.exists(_.checkError())) {
+      logError("Spark detects errors while flushing event logs.")
+    }
+    hadoopDataStream.foreach(_.hflush())
+
+    // 2. Try to close and check the errors
     writer.foreach(_.close())
+    if (writer.exists(_.checkError())) {
+      logError("Spark detects errors while closing event logs.")
+      // 3. Ensuring the underlying stream is closed at least (best-effort).
+      hadoopDataStream.foreach(_.close())
+    }
   }
 
   protected def renameFile(src: Path, dest: Path, overwrite: Boolean): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
index d9d6a4f8d35d..00a92c503be4 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{File, FileOutputStream, IOException}
+import java.io.{File, FileOutputStream, IOException, OutputStream, PrintWriter}
 import java.net.URI
 
 import scala.collection.mutable
@@ -160,8 +160,152 @@ abstract class EventLogFileWritersSuite extends 
SparkFunSuite with LocalSparkCon
       expectedLines: Seq[String] = Seq.empty): Unit
 }
 
+/**
+ * A test OutputStream that simulates IO errors.
+ */
+class ErrorThrowingOutputStream extends OutputStream {
+  var throwOnWrite: Boolean = false
+  var throwOnFlush: Boolean = false
+  var throwOnClose: Boolean = false
+
+  override def write(b: Int): Unit = {
+    if (throwOnWrite) {
+      throw new IOException("Simulated write error")
+    }
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+    if (throwOnWrite) {
+      throw new IOException("Simulated write error")
+    }
+  }
+
+  override def flush(): Unit = {
+    if (throwOnFlush) {
+      throw new IOException("Simulated flush error")
+    }
+  }
+
+  override def close(): Unit = {
+    if (throwOnClose) {
+      throw new IOException("Simulated close error")
+    }
+  }
+}
+
+/**
+ * A testable subclass of SingleEventLogFileWriter that exposes the writer 
field
+ * and closeWriter method for testing.
+ */
+class TestableSingleEventLogFileWriter(
+    appId: String,
+    appAttemptId: Option[String],
+    logBaseDir: URI,
+    sparkConf: SparkConf,
+    hadoopConf: Configuration)
+  extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, 
hadoopConf) {
+
+  def setWriterForTest(pw: PrintWriter): Unit = {
+    writer = Some(pw)
+  }
+
+  def callCloseWriter(): Unit = {
+    closeWriter()
+  }
+}
+
 class SingleEventLogFileWriterSuite extends EventLogFileWritersSuite {
 
+  test("SPARK-55495: closeWriter should log warning when flush error occurs") {
+    val appId = getUniqueApplicationId
+    val attemptId = None
+    val conf = getLoggingConf(testDirPath, None)
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+    val writer = new TestableSingleEventLogFileWriter(
+      appId, attemptId, testDirPath.toUri, conf, hadoopConf)
+
+    // Create a PrintWriter with an ErrorThrowingOutputStream
+    val errorStream = new ErrorThrowingOutputStream()
+    val printWriter = new PrintWriter(errorStream)
+
+    // Simulate an error by writing to a closed stream that causes checkError 
to return true
+    errorStream.throwOnWrite = true
+    // scalastyle:off println
+    printWriter.println("test") // This will set the error flag
+    // scalastyle:on println
+
+    writer.setWriterForTest(printWriter)
+
+    val logAppender = new LogAppender("closeWriter flush error test")
+    withLogAppender(logAppender, level = 
Some(org.apache.logging.log4j.Level.WARN)) {
+      writer.callCloseWriter()
+    }
+
+    val warningMessages = 
logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+    assert(warningMessages.exists(_.contains("Spark detects errors while 
flushing")),
+      s"Expected warning message not found. Messages: $warningMessages")
+  }
+
+  test("SPARK-55495: closeWriter should log warning when close error occurs") {
+    val appId = getUniqueApplicationId
+    val attemptId = None
+    val conf = getLoggingConf(testDirPath, None)
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+    val writer = new TestableSingleEventLogFileWriter(
+      appId, attemptId, testDirPath.toUri, conf, hadoopConf)
+
+    // Create a PrintWriter with an ErrorThrowingOutputStream that errors on 
close
+    val errorStream = new ErrorThrowingOutputStream()
+    val printWriter = new PrintWriter(errorStream)
+
+    // First write something successfully
+    // scalastyle:off println
+    printWriter.println("test")
+    // scalastyle:on println
+    printWriter.flush()
+
+    // Now set up to error on close
+    errorStream.throwOnClose = true
+
+    writer.setWriterForTest(printWriter)
+
+    val logAppender = new LogAppender("closeWriter close error test")
+    withLogAppender(logAppender, level = 
Some(org.apache.logging.log4j.Level.WARN)) {
+      writer.callCloseWriter()
+    }
+
+    val warningMessages = 
logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+    assert(warningMessages.exists(_.contains("Spark detects errors while 
closing")),
+      s"Expected warning message not found. Messages: $warningMessages")
+  }
+
+  test("SPARK-55495: closeWriter should complete without warnings when no 
errors") {
+    val appId = getUniqueApplicationId
+    val attemptId = None
+    val conf = getLoggingConf(testDirPath, None)
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+
+    val writer = new TestableSingleEventLogFileWriter(
+      appId, attemptId, testDirPath.toUri, conf, hadoopConf)
+
+    // Create a normal PrintWriter with no errors
+    val normalStream = new ErrorThrowingOutputStream()
+    val printWriter = new PrintWriter(normalStream)
+
+    writer.setWriterForTest(printWriter)
+
+    val logAppender = new LogAppender("closeWriter no error test")
+    withLogAppender(logAppender, level = 
Some(org.apache.logging.log4j.Level.WARN)) {
+      writer.callCloseWriter()
+    }
+
+    val warningMessages = 
logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+    assert(!warningMessages.exists(_.contains("Spark detects errors")),
+      s"Unexpected warning message found. Messages: $warningMessages")
+  }
+
   test("Log overwriting") {
     val appId = "test"
     val appAttemptId = None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to