This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 96bfd8370c2 [SPARK-46012][CORE] EventLogFileReader should not read rolling logs if app status file is missing 96bfd8370c2 is described below commit 96bfd8370c27baf5283646f2f93cb66ab70de844 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Mon Nov 20 17:50:04 2023 -0800 [SPARK-46012][CORE] EventLogFileReader should not read rolling logs if app status file is missing ### What changes were proposed in this pull request? This PR aims to prevent `EventLogFileReader` from reading rolling event logs if `appStatus` is missing. ### Why are the changes needed? Since Apache Spark 3.0.0, `appstatus_` is supposed to exist. https://github.com/apache/spark/blob/839f0c98bd85a14eadad13f8aaac876275ded5a4/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala#L277-L283 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43914 from dongjoon-hyun/SPARK-46012. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 6ca1c67de082269b9337503bff5161f5a2d87225) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/deploy/history/EventLogFileReaders.scala | 3 ++- .../deploy/history/EventLogFileReadersSuite.scala | 31 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala index b21c67a2823..714987a8eb8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala @@ -119,7 +119,8 @@ object EventLogFileReader extends Logging { if (isSingleEventLog(status)) { Some(new SingleFileEventLogFileReader(fs, status.getPath, Option(status))) } else if (isRollingEventLogs(status)) { - if (fs.listStatus(status.getPath).exists(RollingEventLogFilesWriter.isEventLogFile)) { + if (fs.listStatus(status.getPath).exists(RollingEventLogFilesWriter.isEventLogFile) && + fs.listStatus(status.getPath).exists(RollingEventLogFilesWriter.isAppStatusFile)) { Some(new RollingEventLogFilesFileReader(fs, status.getPath)) } else { logDebug(s"Rolling event log directory have no event log file at ${status.getPath}") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala index efb83934030..f34f792881f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -229,6 +229,37 @@ class SingleFileEventLogFileReaderSuite extends EventLogFileReadersSuite { } class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite { + test("SPARK-46012: appStatus file should exist") { + withTempDir { dir => + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath) + conf.set(EVENT_LOG_ENABLE_ROLLING, true) + conf.set(EVENT_LOG_ROLLING_MAX_FILE_SIZE.key, "10m") + + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + val dummyStr = "dummy" * 1024 + writeTestEvents(writer, dummyStr, 1024 * 1024 * 20) + writer.stop() + + // Verify a healthy rolling event log directory + val logPathCompleted = getCurrentLogPath(writer.logPath, isCompleted = true) + val readerOpt = EventLogFileReader(fileSystem, new Path(logPathCompleted)) + assert(readerOpt.get.isInstanceOf[RollingEventLogFilesFileReader]) + assert(readerOpt.get.listEventLogFiles.length === 3) + + // Make unhealthy rolling event directory by removing appStatus file. + val appStatusFile = fileSystem.listStatus(new Path(logPathCompleted)) + .find(RollingEventLogFilesWriter.isAppStatusFile).get.getPath + fileSystem.delete(appStatusFile, false) + assert(EventLogFileReader(fileSystem, new Path(logPathCompleted)).isEmpty) + } + } + allCodecs.foreach { codecShortName => test(s"rolling event log files - codec $codecShortName") { val appId = getUniqueApplicationId --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org