This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 191a4a4044c3 [SPARK-55864][CORE][TESTS] Add more tests for SHS 
multiple log directories feature
191a4a4044c3 is described below

commit 191a4a4044c33c70c372135565625363b834e2d5
Author: Kousuke Saruta <[email protected]>
AuthorDate: Sat Mar 7 10:24:03 2026 -0800

    [SPARK-55864][CORE][TESTS] Add more tests for SHS multiple log directories 
feature
    
    ### What changes were proposed in this pull request?
    This PR proposes to add more tests for SHS multiple log directories feature 
added in SPARK-55793 (#54575).
    New tests include:
    
    - **directory removed while SHS is running** — verifies that removing a log 
directory at runtime does not crash the scan and apps from remaining 
directories are still listed
    - **directory does not exist at startup but created later** — verifies that 
a directory that doesn't exist at startup is picked up on subsequent scans 
(monthly directory scenario)
    - **directory temporarily inaccessible then recovers** — verifies that apps 
reappear after a temporarily inaccessible directory is restored
    - **all directories inaccessible does not crash** — verifies graceful 
handling when all configured directories become unavailable
    - **config with empty entries between commas** — verifies that empty 
entries in `spark.history.fs.logDirectory` (e.g., `dir1,,dir2`) are handled 
correctly
    - **logDirectory.names count mismatch falls back to full paths** — verifies 
that when the number of names doesn't match the number of directories, display 
names fall back to full paths
    
    ### Why are the changes needed?
    For better test coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Confirmed that all new tests passed.
    ```
    $ build/sbt 'testOnly 
org.apache.spark.deploy.history.RocksDBBackendFsHistoryProviderSuite'
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Kiro CLI / Opus 4.6
    
    Closes #54660 from sarutak/shs-multi-log-dirs-more-tests.
    
    Authored-by: Kousuke Saruta <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../deploy/history/FsHistoryProviderSuite.scala    | 233 +++++++++++++++++++++
 1 file changed, 233 insertions(+)

diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index a1d0b7dc4c05..7c76b50b07ac 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -2148,6 +2148,239 @@ abstract class FsHistoryProviderSuite extends 
SparkFunSuite with Matchers with P
     }
   }
 
+  test("SPARK-55864: directory removed while SHS is running") {
+    val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+    try {
+      val conf = createTestConf().set(HISTORY_LOG_DIR,
+        s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+      val provider = new FsHistoryProvider(conf)
+
+      val log1 = newLogFile("app1", None, inProgress = false)
+      writeFile(log1, None,
+        SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", 
None),
+        SparkListenerApplicationEnd(5L))
+      val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", 
None, None)
+      val log2 = new File(new Path(logUri2).toUri.getPath)
+      writeFile(log2, None,
+        SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", 
None),
+        SparkListenerApplicationEnd(6L))
+
+      updateAndCheck(provider) { list =>
+        list.size should be(2)
+      }
+
+      // Remove dir2 while SHS is running
+      Utils.deleteRecursively(dir2)
+
+      // Next scan should not throw and should still list app1 from testDir
+      updateAndCheck(provider) { list =>
+        list.size should be(1)
+        list.head.id should be("app1-id")
+      }
+
+      provider.stop()
+    } finally {
+      if (dir2.exists()) {
+        Utils.deleteRecursively(dir2)
+      }
+    }
+  }
+
+  test("SPARK-55864: directory does not exist at startup but created later") {
+    val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+    val dir2Path = dir2.getAbsolutePath
+    Utils.deleteRecursively(dir2)
+
+    try {
+      val conf = createTestConf().set(HISTORY_LOG_DIR,
+        s"${testDir.getAbsolutePath},${dir2Path}")
+      val provider = new FsHistoryProvider(conf)
+
+      val log1 = newLogFile("app1", None, inProgress = false)
+      writeFile(log1, None,
+        SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", 
None),
+        SparkListenerApplicationEnd(5L))
+
+      // First scan: dir2 does not exist, but app1 from testDir should be 
listed
+      updateAndCheck(provider) { list =>
+        list.size should be(1)
+        list.head.id should be("app1-id")
+      }
+
+      // Create dir2 and add a log file
+      dir2.mkdirs()
+      val logUri2 = SingleEventLogFileWriter.getLogPath(new 
File(dir2Path).toURI, "app2", None,
+        None)
+      val log2 = new File(new Path(logUri2).toUri.getPath)
+      writeFile(log2, None,
+        SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", 
None),
+        SparkListenerApplicationEnd(6L))
+
+      // Next scan should pick up app2
+      updateAndCheck(provider) { list =>
+        list.size should be(2)
+        list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
+      }
+
+      provider.stop()
+    } finally {
+      if (new File(dir2Path).exists()) {
+        Utils.deleteRecursively(new File(dir2Path))
+      }
+    }
+  }
+
+  test("SPARK-55864: directory temporarily inaccessible then recovers") {
+    val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+    try {
+      val conf = createTestConf().set(HISTORY_LOG_DIR,
+        s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+      val provider = new FsHistoryProvider(conf)
+
+      val log1 = newLogFile("app1", None, inProgress = false)
+      writeFile(log1, None,
+        SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", 
None),
+        SparkListenerApplicationEnd(5L))
+      val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", 
None, None)
+      val log2 = new File(new Path(logUri2).toUri.getPath)
+      writeFile(log2, None,
+        SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", 
None),
+        SparkListenerApplicationEnd(6L))
+
+      updateAndCheck(provider) { list =>
+        list.size should be(2)
+      }
+
+      // Make dir2 inaccessible by removing it
+      val dir2Backup = Utils.createTempDir(namePrefix = "logDir2Backup")
+      Utils.deleteRecursively(dir2Backup)
+      assert(dir2.renameTo(dir2Backup))
+
+      // Scan should still work for testDir
+      updateAndCheck(provider) { list =>
+        list.size should be(1)
+        list.head.id should be("app1-id")
+      }
+
+      // Restore dir2
+      assert(dir2Backup.renameTo(dir2))
+
+      // Next scan should recover app2
+      updateAndCheck(provider) { list =>
+        list.size should be(2)
+        list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
+      }
+
+      provider.stop()
+    } finally {
+      if (dir2.exists()) {
+        Utils.deleteRecursively(dir2)
+      }
+    }
+  }
+
+  test("SPARK-55864: all directories inaccessible does not crash") {
+    val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+    try {
+      val conf = createTestConf().set(HISTORY_LOG_DIR,
+        s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+      val provider = new FsHistoryProvider(conf)
+
+      val log1 = newLogFile("app1", None, inProgress = false)
+      writeFile(log1, None,
+        SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", 
None),
+        SparkListenerApplicationEnd(5L))
+
+      updateAndCheck(provider) { list =>
+        list.size should be(1)
+      }
+
+      // Remove both directories
+      val testDirBackup = Utils.createTempDir(namePrefix = "testDirBackup")
+      Utils.deleteRecursively(testDirBackup)
+      assert(testDir.renameTo(testDirBackup))
+      Utils.deleteRecursively(dir2)
+
+      try {
+        // Should not throw
+        provider.checkForLogs()
+        // After all dirs gone, listing should return no apps
+        provider.getListing().toSeq.size should be(0)
+      } finally {
+        // Always restore testDir so afterEach / subsequent tests are not 
affected
+        assert(testDirBackup.renameTo(testDir))
+      }
+      provider.stop()
+    } finally {
+      if (dir2.exists()) {
+        Utils.deleteRecursively(dir2)
+      }
+    }
+  }
+
+  test("SPARK-55864: config with empty entries between commas") {
+    val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+    try {
+      // "dir1,,dir2" - empty entry between commas
+      val conf = createTestConf().set(HISTORY_LOG_DIR,
+        s"${testDir.getAbsolutePath},,${dir2.getAbsolutePath}")
+      val provider = new FsHistoryProvider(conf)
+
+      val log1 = newLogFile("app1", None, inProgress = false)
+      writeFile(log1, None,
+        SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", 
None),
+        SparkListenerApplicationEnd(5L))
+      val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", 
None, None)
+      val log2 = new File(new Path(logUri2).toUri.getPath)
+      writeFile(log2, None,
+        SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", 
None),
+        SparkListenerApplicationEnd(6L))
+
+      updateAndCheck(provider) { list =>
+        list.size should be(2)
+        list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
+      }
+
+      provider.stop()
+    } finally {
+      Utils.deleteRecursively(dir2)
+    }
+  }
+
+  test("SPARK-55864: logDirectory.names count mismatch falls back to full 
paths") {
+    val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+    try {
+      val conf = createTestConf()
+        .set(HISTORY_LOG_DIR,
+          s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+        .set(HISTORY_LOG_DIR_NAMES, "OnlyOneName")
+      val provider = new FsHistoryProvider(conf)
+
+      val log1 = newLogFile("app1", None, inProgress = false)
+      writeFile(log1, None,
+        SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test", 
None),
+        SparkListenerApplicationEnd(5L))
+      val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2", 
None, None)
+      val log2 = new File(new Path(logUri2).toUri.getPath)
+      writeFile(log2, None,
+        SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test", 
None),
+        SparkListenerApplicationEnd(6L))
+
+      updateAndCheck(provider) { list =>
+        list.size should be(2)
+        // Names mismatch: should fall back to full paths
+        val app1 = list.find(_.id == "app1-id").get
+        val app2 = list.find(_.id == "app2-id").get
+        app1.attempts.head.logSourceName should 
be(Some(testDir.getAbsolutePath))
+        app2.attempts.head.logSourceName should be(Some(dir2.getAbsolutePath))
+      }
+
+      provider.stop()
+    } finally {
+      Utils.deleteRecursively(dir2)
+    }
+  }
+
   private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
     extends FsHistoryProvider(conf, clock) {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to