This is an automated email from the ASF dual-hosted git repository.

mridulm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c082f824d4e2 [SPARK-56645][CORE] Fix History Server serving stale UI 
after app completes
c082f824d4e2 is described below

commit c082f824d4e2e837792fdbb2c381fb77a71ccd9f
Author: cxzl25 <[email protected]>
AuthorDate: Sat Jun 6 00:41:27 2026 -0500

    [SPARK-56645][CORE] Fix History Server serving stale UI after app completes
    
    ### What changes were proposed in this pull request?
    When `mergeApplicationListing()` successfully parses a completed event log, 
it now proactively
    deletes any existing disk store for the app if the app's UI is not 
currently tracked in
    `activeUIs`. Concretely, the following lines are added after the 
`invalidateUI()` call in
    `doMergeApplicationListingInternal`:
    
    ```scala
    if (app.attempts.head.info.completed) {
      val hasActiveUI = synchronized { activeUIs.contains((appId, attemptId)) }
      if (!hasActiveUI) {
        diskManager.foreach(_.release(appId, attemptId, delete = true))
      }
    }
    ```
    
    ### Why are the changes needed?
    There is a race condition between `ApplicationCache`'s LRU eviction and
    `FsHistoryProvider.invalidateUI()` that causes the History Server to serve 
stale UI data
    after an in-progress app completes.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After this fix, users who access the History Server UI for an 
application that completed after its UI was evicted from the `ApplicationCache` 
will see the fully-completed application data (all jobs, stages, and the final 
application end event), instead of a stale snapshot from when the UI was last 
loaded while the app was still in progress.
    
    ### How was this patch tested?
    Add test to FsHistoryProviderSuite
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: GitHub Copilot
    
    Closes #55578 from cxzl25/SPARK-56645.
    
    Authored-by: cxzl25 <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../spark/deploy/history/FsHistoryProvider.scala   | 17 ++++-
 .../deploy/history/FsHistoryProviderSuite.scala    | 77 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index d166e61bfb82..79687bae0a10 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -1118,7 +1118,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
       case Some(app) if !lookForEndEvent || app.attempts.head.info.completed =>
         // In this case, we either didn't care about the end event, or we 
found it. So the
         // listing data is good.
-        invalidateUI(app.info.id, app.attempts.head.info.attemptId)
+        val appId = app.info.id
+        val attemptId = app.attempts.head.info.attemptId
+        invalidateUI(appId, attemptId)
+        // If the app has just completed, any existing disk store may have 
been built from an
+        // in-progress snapshot and is now stale. invalidateUI() above only 
handles the case
+        // where the UI is still tracked in activeUIs (i.e., still held in the 
ApplicationCache).
+        // If the ApplicationCache already evicted the UI entry (e.g., due to 
LRU pressure),
+        // the UI was removed from activeUIs before invalidateUI() was called, 
so the disk
+        // store was never marked for deletion. Proactively delete it here so 
that the next
+        // loadDiskStore() call rebuilds from the completed event log.
+        if (app.attempts.head.info.completed) {
+          val hasActiveUI = synchronized { activeUIs.contains((appId, 
attemptId)) }
+          if (!hasActiveUI) {
+            diskManager.foreach(_.release(appId, attemptId, delete = true))
+          }
+        }
         addListing(app)
         listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, 
Some(app.info.id),
           app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, 
reader.lastIndex,
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 829010179bda..16f9a4300db0 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1035,6 +1035,83 @@ abstract class FsHistoryProviderSuite extends 
SparkFunSuite with Matchers with P
     freshUI.get.ui.store.job(0)
   }
 
+  test("stale disk store is rebuilt after app completes when UI was evicted 
from cache") {
+    // Test for the following race condition:
+    //
+    // 1. An in-progress app's UI is loaded -> disk store built from the 
.inprogress snapshot.
+    // 2. ApplicationCache evicts the UI entry (LRU pressure) -> 
onUIDetached() is called with
+    //    loadedUI.valid == true (app not yet complete) -> 
dm.release(delete=false) -> disk store
+    //    is kept on disk, entry removed from activeUIs.
+    // 3. App completes, checkForLogs() detects the completed log, 
mergeApplicationListing() calls
+    //    invalidateUI() but finds nothing in activeUIs -> the stale disk 
store is never deleted.
+    // 4. Next getAppUI() reopens the stale disk store from the .inprogress 
snapshot, missing
+    //    events written after the snapshot (e.g. JobStart, ApplicationEnd).
+    //
+    // The fix: when mergeApplicationListing() processes a completed log and 
the UI is not in
+    // activeUIs, proactively call dm.release(delete=true) to delete the stale 
disk store.
+    withTempDir { storeDir =>
+      val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
+      val provider = new FsHistoryProvider(conf)
+      val appId = "new1"
+
+      // Step 1: Write an in-progress log containing only ApplicationStart (no 
job).
+      val inProgressLog = newLogFile(appId, None, inProgress = true)
+      writeFile(inProgressLog, None,
+        SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None)
+      )
+      provider.checkForLogs()
+
+      // Step 2: Load the app UI; this builds the disk store from the 
in-progress snapshot.
+      val firstUI = provider.getAppUI(appId, None)
+      assert(firstUI.isDefined)
+      // No job exists in the in-progress snapshot.
+      intercept[NoSuchElementException] { firstUI.get.ui.store.job(0) }
+
+      // Step 3: Simulate ApplicationCache LRU eviction BEFORE the app 
completes.
+      // onUIDetached() is called with loadedUI.valid == true -> 
dm.release(delete=false):
+      // the disk store is kept but the entry is removed from activeUIs.
+      provider.onUIDetached(appId, None, firstUI.get.ui)
+
+      // Key invariant: after LRU eviction, valid is still true because the 
app has not
+      // completed yet. This is the heart of the bug: onUIDetached called
+      // dm.release(delete=false) because valid==true at eviction time.
+      assert(firstUI.get.valid)
+
+      // Step 4: Complete the app. Write a new log file (without .inprogress 
suffix) that
+      // contains ApplicationStart + JobStart + ApplicationEnd, and delete the 
old one.
+      val completedLog = newLogFile(appId, None, inProgress = false)
+      writeFile(completedLog, None,
+        SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None),
+        SparkListenerJobStart(0, 1L, Nil, null),
+        SparkListenerApplicationEnd(5L)
+      )
+      inProgressLog.delete()
+
+      // Step 5: checkForLogs() detects the completed log.
+      // With the fix, mergeApplicationListing() proactively deletes the stale 
disk store
+      // because activeUIs is empty (the entry was already evicted in step 3).
+      provider.checkForLogs()
+
+      // Step 6: Load the UI again.
+      // WITHOUT the fix: loadDiskStore() would find the old .ldb dir and 
reopen it ->
+      //   stale snapshot -> no job data -> ui.store.job(0) throws 
NoSuchElementException.
+      // WITH the fix: the old disk store was deleted in step 5 -> rebuilt 
from the completed
+      //   log -> job data is present.
+      val freshUI = provider.getAppUI(appId, None)
+      assert(freshUI.isDefined)
+
+      // The refreshed UI must contain job data from the completed log.
+      freshUI.get.ui.store.job(0)
+
+      // The attempt must be marked as completed in the listing.
+      val appInfo = provider.getListing().toSeq
+      assert(appInfo.size === 1)
+      assert(appInfo.head.attempts.head.completed)
+
+      provider.onUIDetached(appId, None, freshUI.get.ui)
+    }
+  }
+
   test("clean up stale app information") {
     withTempDir { storeDir =>
       val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to