This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 191a4a4044c3 [SPARK-55864][CORE][TESTS] Add more tests for SHS
multiple log directories feature
191a4a4044c3 is described below
commit 191a4a4044c33c70c372135565625363b834e2d5
Author: Kousuke Saruta <[email protected]>
AuthorDate: Sat Mar 7 10:24:03 2026 -0800
[SPARK-55864][CORE][TESTS] Add more tests for SHS multiple log directories
feature
### What changes were proposed in this pull request?
This PR proposes to add more tests for SHS multiple log directories feature
added in SPARK-55793 (#54575).
New tests include:
- **directory removed while SHS is running** — verifies that removing a log
directory at runtime does not crash the scan and apps from remaining
directories are still listed
- **directory does not exist at startup but created later** — verifies that
a directory that doesn't exist at startup is picked up on subsequent scans
(monthly directory scenario)
- **directory temporarily inaccessible then recovers** — verifies that apps
reappear after a temporarily inaccessible directory is restored
- **all directories inaccessible does not crash** — verifies graceful
handling when all configured directories become unavailable
- **config with empty entries between commas** — verifies that empty
entries in `spark.history.fs.logDirectory` (e.g., `dir1,,dir2`) are handled
correctly
- **logDirectory.names count mismatch falls back to full paths** — verifies
that when the number of names doesn't match the number of directories, display
names fall back to full paths
### Why are the changes needed?
For better test coverage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Confirmed that all new tests passed.
```
$ build/sbt 'testOnly
org.apache.spark.deploy.history.RocksDBBackendFsHistoryProviderSuite'
```
### Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Opus 4.6
Closes #54660 from sarutak/shs-multi-log-dirs-more-tests.
Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../deploy/history/FsHistoryProviderSuite.scala | 233 +++++++++++++++++++++
1 file changed, 233 insertions(+)
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index a1d0b7dc4c05..7c76b50b07ac 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -2148,6 +2148,239 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with P
}
}
+ test("SPARK-55864: directory removed while SHS is running") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ val conf = createTestConf().set(HISTORY_LOG_DIR,
+ s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be(2)
+ }
+
+ // Remove dir2 while SHS is running
+ Utils.deleteRecursively(dir2)
+
+ // Next scan should not throw and should still list app1 from testDir
+ updateAndCheck(provider) { list =>
+ list.size should be(1)
+ list.head.id should be("app1-id")
+ }
+
+ provider.stop()
+ } finally {
+ if (dir2.exists()) {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+ }
+
+ test("SPARK-55864: directory does not exist at startup but created later") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ val dir2Path = dir2.getAbsolutePath
+ Utils.deleteRecursively(dir2)
+
+ try {
+ val conf = createTestConf().set(HISTORY_LOG_DIR,
+ s"${testDir.getAbsolutePath},${dir2Path}")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+
+ // First scan: dir2 does not exist, but app1 from testDir should be
listed
+ updateAndCheck(provider) { list =>
+ list.size should be(1)
+ list.head.id should be("app1-id")
+ }
+
+ // Create dir2 and add a log file
+ dir2.mkdirs()
+ val logUri2 = SingleEventLogFileWriter.getLogPath(new
File(dir2Path).toURI, "app2", None,
+ None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ // Next scan should pick up app2
+ updateAndCheck(provider) { list =>
+ list.size should be(2)
+ list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
+ }
+
+ provider.stop()
+ } finally {
+ if (new File(dir2Path).exists()) {
+ Utils.deleteRecursively(new File(dir2Path))
+ }
+ }
+ }
+
+ test("SPARK-55864: directory temporarily inaccessible then recovers") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ val conf = createTestConf().set(HISTORY_LOG_DIR,
+ s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be(2)
+ }
+
+ // Make dir2 inaccessible by removing it
+ val dir2Backup = Utils.createTempDir(namePrefix = "logDir2Backup")
+ Utils.deleteRecursively(dir2Backup)
+ assert(dir2.renameTo(dir2Backup))
+
+ // Scan should still work for testDir
+ updateAndCheck(provider) { list =>
+ list.size should be(1)
+ list.head.id should be("app1-id")
+ }
+
+ // Restore dir2
+ assert(dir2Backup.renameTo(dir2))
+
+ // Next scan should recover app2
+ updateAndCheck(provider) { list =>
+ list.size should be(2)
+ list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
+ }
+
+ provider.stop()
+ } finally {
+ if (dir2.exists()) {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+ }
+
+ test("SPARK-55864: all directories inaccessible does not crash") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ val conf = createTestConf().set(HISTORY_LOG_DIR,
+ s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be(1)
+ }
+
+ // Remove both directories
+ val testDirBackup = Utils.createTempDir(namePrefix = "testDirBackup")
+ Utils.deleteRecursively(testDirBackup)
+ assert(testDir.renameTo(testDirBackup))
+ Utils.deleteRecursively(dir2)
+
+ try {
+ // Should not throw
+ provider.checkForLogs()
+ // After all dirs gone, listing should return no apps
+ provider.getListing().toSeq.size should be(0)
+ } finally {
+ // Always restore testDir so afterEach / subsequent tests are not
affected
+ assert(testDirBackup.renameTo(testDir))
+ }
+ provider.stop()
+ } finally {
+ if (dir2.exists()) {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+ }
+
+ test("SPARK-55864: config with empty entries between commas") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ // "dir1,,dir2" - empty entry between commas
+ val conf = createTestConf().set(HISTORY_LOG_DIR,
+ s"${testDir.getAbsolutePath},,${dir2.getAbsolutePath}")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be(2)
+ list.map(_.id).toSet should be(Set("app1-id", "app2-id"))
+ }
+
+ provider.stop()
+ } finally {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+
+ test("SPARK-55864: logDirectory.names count mismatch falls back to full
paths") {
+ val dir2 = Utils.createTempDir(namePrefix = "logDir2")
+ try {
+ val conf = createTestConf()
+ .set(HISTORY_LOG_DIR,
+ s"${testDir.getAbsolutePath},${dir2.getAbsolutePath}")
+ .set(HISTORY_LOG_DIR_NAMES, "OnlyOneName")
+ val provider = new FsHistoryProvider(conf)
+
+ val log1 = newLogFile("app1", None, inProgress = false)
+ writeFile(log1, None,
+ SparkListenerApplicationStart("app1", Some("app1-id"), 1L, "test",
None),
+ SparkListenerApplicationEnd(5L))
+ val logUri2 = SingleEventLogFileWriter.getLogPath(dir2.toURI, "app2",
None, None)
+ val log2 = new File(new Path(logUri2).toUri.getPath)
+ writeFile(log2, None,
+ SparkListenerApplicationStart("app2", Some("app2-id"), 2L, "test",
None),
+ SparkListenerApplicationEnd(6L))
+
+ updateAndCheck(provider) { list =>
+ list.size should be(2)
+ // Names mismatch: should fall back to full paths
+ val app1 = list.find(_.id == "app1-id").get
+ val app2 = list.find(_.id == "app2-id").get
+ app1.attempts.head.logSourceName should
be(Some(testDir.getAbsolutePath))
+ app2.attempts.head.logSourceName should be(Some(dir2.getAbsolutePath))
+ }
+
+ provider.stop()
+ } finally {
+ Utils.deleteRecursively(dir2)
+ }
+ }
+
private class SafeModeTestProvider(conf: SparkConf, clock: Clock)
extends FsHistoryProvider(conf, clock) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]