This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
commit 02f80cf293739f4d2881316897dbdcea74daa0bc Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> AuthorDate: Thu Oct 15 15:28:52 2020 +0900 Revert "Revert "[SPARK-33146][CORE] Check for non-fatal errors when loading new applications in SHS"" This reverts commit e40c147a5d194adbba13f12590959dc68347ec14. --- .../spark/deploy/history/FsHistoryProvider.scala | 3 ++ .../deploy/history/FsHistoryProviderSuite.scala | 49 ++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f5e7c4fa..7e63d55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -527,6 +527,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) reader.fileSizeForLastIndex > 0 } catch { case _: FileNotFoundException => false + case NonFatal(e) => + logWarning(s"Error while reading new log ${reader.rootPath}", e) + false } case NonFatal(e) => diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index c2f34fc..f3beb35 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1470,6 +1470,55 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + test("SPARK-33146: don't let one bad rolling log folder prevent loading other applications") { + withTempDir { dir => + val conf = createTestConf(true) + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf) + writer.start() + + writeEventsToRollingWriter(writer, Seq( + SparkListenerApplicationStart("app", Some("app"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + provider.checkForLogs() + provider.cleanLogs() + assert(dir.listFiles().size === 1) + assert(provider.getListing.length === 1) + + // Manually delete the appstatus file to make an invalid rolling event log + val appStatusPath = RollingEventLogFilesWriter.getAppStatusFilePath(new Path(writer.logPath), + "app", None, true) + fs.delete(appStatusPath, false) + provider.checkForLogs() + provider.cleanLogs() + assert(provider.getListing.length === 0) + + // Create a new application + val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI, conf, hadoopConf) + writer2.start() + writeEventsToRollingWriter(writer2, Seq( + SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + + // Both folders exist but only one application found + provider.checkForLogs() + provider.cleanLogs() + assert(provider.getListing.length === 1) + assert(dir.listFiles().size === 2) + + // Make sure a new provider sees the valid application + provider.stop() + val newProvider = new FsHistoryProvider(conf) + newProvider.checkForLogs() + assert(newProvider.getListing.length === 1) + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org