This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new feec4527211 [SPARK-46012][CORE] EventLogFileReader should not read 
rolling logs if app status file is missing
feec4527211 is described below

commit feec4527211d83f3ae9f3b9a3673aa1b71dde1a4
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon Nov 20 17:50:04 2023 -0800

    [SPARK-46012][CORE] EventLogFileReader should not read rolling logs if app 
status file is missing
    
    ### What changes were proposed in this pull request?
    
    This PR aims to prevent `EventLogFileReader` from reading rolling event 
logs if `appStatus` is missing.
    
    ### Why are the changes needed?
    
    Since Apache Spark 3.0.0, `appstatus_` is supposed to exist.
    
    
https://github.com/apache/spark/blob/839f0c98bd85a14eadad13f8aaac876275ded5a4/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala#L277-L283
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43914 from dongjoon-hyun/SPARK-46012.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 6ca1c67de082269b9337503bff5161f5a2d87225)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/deploy/history/EventLogFileReaders.scala |  3 ++-
 .../deploy/history/EventLogFileReadersSuite.scala  | 31 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
index b21c67a2823..714987a8eb8 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
@@ -119,7 +119,8 @@ object EventLogFileReader extends Logging {
     if (isSingleEventLog(status)) {
       Some(new SingleFileEventLogFileReader(fs, status.getPath, 
Option(status)))
     } else if (isRollingEventLogs(status)) {
-      if 
(fs.listStatus(status.getPath).exists(RollingEventLogFilesWriter.isEventLogFile))
 {
+      if 
(fs.listStatus(status.getPath).exists(RollingEventLogFilesWriter.isEventLogFile)
 &&
+          
fs.listStatus(status.getPath).exists(RollingEventLogFilesWriter.isAppStatusFile))
 {
         Some(new RollingEventLogFilesFileReader(fs, status.getPath))
       } else {
         logDebug(s"Rolling event log directory have no event log file at 
${status.getPath}")
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
index efb83934030..f34f792881f 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
@@ -229,6 +229,37 @@ class SingleFileEventLogFileReaderSuite extends 
EventLogFileReadersSuite {
 }
 
 class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite {
+  test("SPARK-46012: appStatus file should exist") {
+    withTempDir { dir =>
+      val appId = getUniqueApplicationId
+      val attemptId = None
+
+      val conf = getLoggingConf(testDirPath)
+      conf.set(EVENT_LOG_ENABLE_ROLLING, true)
+      conf.set(EVENT_LOG_ROLLING_MAX_FILE_SIZE.key, "10m")
+
+      val writer = createWriter(appId, attemptId, testDirPath.toUri, conf,
+        SparkHadoopUtil.get.newConfiguration(conf))
+
+      writer.start()
+      val dummyStr = "dummy" * 1024
+      writeTestEvents(writer, dummyStr, 1024 * 1024 * 20)
+      writer.stop()
+
+      // Verify a healthy rolling event log directory
+      val logPathCompleted = getCurrentLogPath(writer.logPath, isCompleted = 
true)
+      val readerOpt = EventLogFileReader(fileSystem, new 
Path(logPathCompleted))
+      assert(readerOpt.get.isInstanceOf[RollingEventLogFilesFileReader])
+      assert(readerOpt.get.listEventLogFiles.length === 3)
+
+      // Make unhealthy rolling event directory by removing appStatus file.
+      val appStatusFile = fileSystem.listStatus(new Path(logPathCompleted))
+        .find(RollingEventLogFilesWriter.isAppStatusFile).get.getPath
+      fileSystem.delete(appStatusFile, false)
+      assert(EventLogFileReader(fileSystem, new 
Path(logPathCompleted)).isEmpty)
+    }
+  }
+
   allCodecs.foreach { codecShortName =>
     test(s"rolling event log files - codec $codecShortName") {
       val appId = getUniqueApplicationId


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to