This is an automated email from the ASF dual-hosted git repository.
mridulm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c082f824d4e2 [SPARK-56645][CORE] Fix History Server serving stale UI
after app completes
c082f824d4e2 is described below
commit c082f824d4e2e837792fdbb2c381fb77a71ccd9f
Author: cxzl25 <[email protected]>
AuthorDate: Sat Jun 6 00:41:27 2026 -0500
[SPARK-56645][CORE] Fix History Server serving stale UI after app completes
### What changes were proposed in this pull request?
When `mergeApplicationListing()` successfully parses a completed event log,
it now proactively
deletes any existing disk store for the app if the app's UI is not
currently tracked in
`activeUIs`. Concretely, the following lines are added after the
`invalidateUI()` call in
`doMergeApplicationListingInternal`:
```scala
if (app.attempts.head.info.completed) {
val hasActiveUI = synchronized { activeUIs.contains((appId, attemptId)) }
if (!hasActiveUI) {
diskManager.foreach(_.release(appId, attemptId, delete = true))
}
}
```
### Why are the changes needed?
There is a race condition between `ApplicationCache`'s LRU eviction and
`FsHistoryProvider.invalidateUI()` that causes the History Server to serve
stale UI data
after an in-progress app completes.
### Does this PR introduce _any_ user-facing change?
Yes. After this fix, users who access the History Server UI for an
application that completed after its UI was evicted from the `ApplicationCache`
will see the fully-completed application data (all jobs, stages, and the final
application end event), instead of a stale snapshot from when the UI was last
loaded while the app was still in progress.
### How was this patch tested?
Add test to FsHistoryProviderSuite
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: GitHub Copilot
Closes #55578 from cxzl25/SPARK-56645.
Authored-by: cxzl25 <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../spark/deploy/history/FsHistoryProvider.scala | 17 ++++-
.../deploy/history/FsHistoryProviderSuite.scala | 77 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index d166e61bfb82..79687bae0a10 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -1118,7 +1118,22 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
case Some(app) if !lookForEndEvent || app.attempts.head.info.completed =>
// In this case, we either didn't care about the end event, or we
found it. So the
// listing data is good.
- invalidateUI(app.info.id, app.attempts.head.info.attemptId)
+ val appId = app.info.id
+ val attemptId = app.attempts.head.info.attemptId
+ invalidateUI(appId, attemptId)
+ // If the app has just completed, any existing disk store may have
been built from an
+ // in-progress snapshot and is now stale. invalidateUI() above only
handles the case
+ // where the UI is still tracked in activeUIs (i.e., still held in the
ApplicationCache).
+ // If the ApplicationCache already evicted the UI entry (e.g., due to
LRU pressure),
+ // the UI was removed from activeUIs before invalidateUI() was called,
so the disk
+ // store was never marked for deletion. Proactively delete it here so
that the next
+ // loadDiskStore() call rebuilds from the completed event log.
+ if (app.attempts.head.info.completed) {
+ val hasActiveUI = synchronized { activeUIs.contains((appId,
attemptId)) }
+ if (!hasActiveUI) {
+ diskManager.foreach(_.release(appId, attemptId, delete = true))
+ }
+ }
addListing(app)
listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs,
Some(app.info.id),
app.attempts.head.info.attemptId, reader.fileSizeForLastIndex,
reader.lastIndex,
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 829010179bda..16f9a4300db0 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1035,6 +1035,83 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with P
freshUI.get.ui.store.job(0)
}
+ test("stale disk store is rebuilt after app completes when UI was evicted
from cache") {
+ // Test for the following race condition:
+ //
+ // 1. An in-progress app's UI is loaded -> disk store built from the
.inprogress snapshot.
+ // 2. ApplicationCache evicts the UI entry (LRU pressure) ->
onUIDetached() is called with
+ // loadedUI.valid == true (app not yet complete) ->
dm.release(delete=false) -> disk store
+ // is kept on disk, entry removed from activeUIs.
+ // 3. App completes, checkForLogs() detects the completed log,
mergeApplicationListing() calls
+ // invalidateUI() but finds nothing in activeUIs -> the stale disk
store is never deleted.
+ // 4. Next getAppUI() reopens the stale disk store from the .inprogress
snapshot, missing
+ // events written after the snapshot (e.g. JobStart, ApplicationEnd).
+ //
+ // The fix: when mergeApplicationListing() processes a completed log and
the UI is not in
+ // activeUIs, proactively call dm.release(delete=true) to delete the stale
disk store.
+ withTempDir { storeDir =>
+ val conf = createTestConf().set(LOCAL_STORE_DIR,
storeDir.getAbsolutePath())
+ val provider = new FsHistoryProvider(conf)
+ val appId = "new1"
+
+ // Step 1: Write an in-progress log containing only ApplicationStart (no
job).
+ val inProgressLog = newLogFile(appId, None, inProgress = true)
+ writeFile(inProgressLog, None,
+ SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None)
+ )
+ provider.checkForLogs()
+
+ // Step 2: Load the app UI; this builds the disk store from the
in-progress snapshot.
+ val firstUI = provider.getAppUI(appId, None)
+ assert(firstUI.isDefined)
+ // No job exists in the in-progress snapshot.
+ intercept[NoSuchElementException] { firstUI.get.ui.store.job(0) }
+
+ // Step 3: Simulate ApplicationCache LRU eviction BEFORE the app
completes.
+ // onUIDetached() is called with loadedUI.valid == true ->
dm.release(delete=false):
+ // the disk store is kept but the entry is removed from activeUIs.
+ provider.onUIDetached(appId, None, firstUI.get.ui)
+
+ // Key invariant: after LRU eviction, valid is still true because the
app has not
+ // completed yet. This is the heart of the bug: onUIDetached called
+ // dm.release(delete=false) because valid==true at eviction time.
+ assert(firstUI.get.valid)
+
+ // Step 4: Complete the app. Write a new log file (without .inprogress
suffix) that
+ // contains ApplicationStart + JobStart + ApplicationEnd, and delete the
old one.
+ val completedLog = newLogFile(appId, None, inProgress = false)
+ writeFile(completedLog, None,
+ SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None),
+ SparkListenerJobStart(0, 1L, Nil, null),
+ SparkListenerApplicationEnd(5L)
+ )
+ inProgressLog.delete()
+
+ // Step 5: checkForLogs() detects the completed log.
+ // With the fix, mergeApplicationListing() proactively deletes the stale
disk store
+ // because activeUIs is empty (the entry was already evicted in step 3).
+ provider.checkForLogs()
+
+ // Step 6: Load the UI again.
+ // WITHOUT the fix: loadDiskStore() would find the old .ldb dir and
reopen it ->
+ // stale snapshot -> no job data -> ui.store.job(0) throws
NoSuchElementException.
+ // WITH the fix: the old disk store was deleted in step 5 -> rebuilt
from the completed
+ // log -> job data is present.
+ val freshUI = provider.getAppUI(appId, None)
+ assert(freshUI.isDefined)
+
+ // The refreshed UI must contain job data from the completed log.
+ freshUI.get.ui.store.job(0)
+
+ // The attempt must be marked as completed in the listing.
+ val appInfo = provider.getListing().toSeq
+ assert(appInfo.size === 1)
+ assert(appInfo.head.attempts.head.completed)
+
+ provider.onUIDetached(appId, None, freshUI.get.ui)
+ }
+ }
+
test("clean up stale app information") {
withTempDir { storeDir =>
val conf = createTestConf().set(LOCAL_STORE_DIR,
storeDir.getAbsolutePath())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]