This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new eddf40d  [SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during 
HistoryServerDiskManager initializing
eddf40d is described below

commit eddf40d5e48ada258dff45661816073cb39eb721
Author: Zhen Li <z...@microsoft.com>
AuthorDate: Wed Jul 8 21:58:45 2020 +0900

    [SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during 
HistoryServerDiskManager initializing
    
    ### What changes were proposed in this pull request?
    
    Update ApplicationStoreInfo.size to real size during 
HistoryServerDiskManager initializing.
    
    ### Why are the changes needed?
    
    This PR is for fixing bug 
[32024](https://issues.apache.org/jira/browse/SPARK-32024). We found after 
history server restart, below error would randomly happen: 
"java.lang.IllegalStateException: Disk usage tracker went negative (now = -***, 
delta = -***)" from `HistoryServerDiskManager`.
    
![Capture](https://user-images.githubusercontent.com/10524738/85034468-fda4ae80-b136-11ea-9011-f0c3e6508002.JPG)
    
    **Cause**: Reading data from level db would trigger table file compaction, 
which may also trigger size of level db directory changes.  This size change 
may not be recorded in LevelDB (`ApplicationStoreInfo` in `listing`). When 
service restarts, `currentUsage` is calculated from real directory size, but 
`ApplicationStoreInfo` are loaded from leveldb, then `currentUsage` may be less 
then sum of `ApplicationStoreInfo.size`. In `makeRoom()` function, 
`ApplicationStoreInfo.size` is used to [...]
    **Reproduce**: we can reproduce this issue in dev environment by reducing 
config value of "spark.history.retainedApplications" and 
"spark.history.store.maxDiskUsage" to some small values. Here are steps: 1. 
start history server, load some applications and access some pages (maybe 
"stages" page to trigger leveldb compaction). 2. restart HS, and refresh pages.
    I also added an UT to simulate this case in `HistoryServerDiskManagerSuite`.
    **Benefit**: this change would help improve history server reliability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Add unit test and manually tested it.
    
    Closes #28859 from 
zhli1142015/update-ApplicationStoreInfo.size-during-disk-manager-initialize.
    
    Authored-by: Zhen Li <z...@microsoft.com>
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 8e7fc04637bbb8d2fdc2c758746e0eaf496c4d92)
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
---
 .../deploy/history/HistoryServerDiskManager.scala  | 21 ++++++++--
 .../history/HistoryServerDiskManagerSuite.scala    | 46 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
index ad0dd23..dc17140 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -76,14 +76,29 @@ private class HistoryServerDiskManager(
 
     // Go through the recorded store directories and remove any that may have 
been removed by
     // external code.
-    val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { 
info =>
-      !new File(info.path).exists()
-    }.toSeq
+    val (existences, orphans) = listing
+      .view(classOf[ApplicationStoreInfo])
+      .asScala
+      .toSeq
+      .partition { info =>
+        new File(info.path).exists()
+      }
 
     orphans.foreach { info =>
       listing.delete(info.getClass(), info.path)
     }
 
+    // Reading level db would trigger table file compaction, then it may cause 
size of level db
+    // directory changed. When service restarts, "currentUsage" is calculated 
from real directory
+    // size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals
+    // sum of "ApplicationStoreInfo.size".
+    existences.foreach { info =>
+      val fileSize = sizeOf(new File(info.path))
+      if (fileSize != info.size) {
+        listing.write(info.copy(size = fileSize))
+      }
+    }
+
     logInfo("Initialized disk manager: " +
       s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
       s"max usage = ${Utils.bytesToString(maxUsage)}")
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
index 4b1b921..a050519 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
@@ -157,4 +157,50 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite 
with BeforeAndAfter {
     assert(manager.approximateSize(50L, true) > 50L)
   }
 
+  test("SPARK-32024: update ApplicationStoreInfo.size during initializing") {
+    val manager = mockManager()
+    val leaseA = manager.lease(2)
+    doReturn(3L).when(manager).sizeOf(meq(leaseA.tmpPath))
+    val dstA = leaseA.commit("app1", None)
+    assert(manager.free() === 0)
+    assert(manager.committed() === 3)
+    // Listing store tracks dstA now.
+    assert(store.read(classOf[ApplicationStoreInfo], 
dstA.getAbsolutePath).size === 3)
+
+    // Simulate: service restarts, new disk manager (manager1) is initialized.
+    val manager1 = mockManager()
+    // Simulate: event KVstore compaction before restart, directory size 
reduces.
+    doReturn(2L).when(manager1).sizeOf(meq(dstA))
+    doReturn(2L).when(manager1).sizeOf(meq(new File(testDir, "apps")))
+    manager1.initialize()
+    // "ApplicationStoreInfo.size" is updated for dstA.
+    assert(store.read(classOf[ApplicationStoreInfo], 
dstA.getAbsolutePath).size === 2)
+    assert(manager1.free() === 1)
+    // If "ApplicationStoreInfo.size" is not correctly updated, 
"IllegalStateException"
+    // would be thrown.
+    val leaseB = manager1.lease(2)
+    assert(manager1.free() === 1)
+    doReturn(2L).when(manager1).sizeOf(meq(leaseB.tmpPath))
+    val dstB = leaseB.commit("app2", None)
+    assert(manager1.committed() === 2)
+    // Listing store tracks dstB only, dstA is evicted by "makeRoom()".
+    assert(store.read(classOf[ApplicationStoreInfo], 
dstB.getAbsolutePath).size === 2)
+
+    val manager2 = mockManager()
+    // Simulate: cache entities are written after replaying, directory size 
increases.
+    doReturn(3L).when(manager2).sizeOf(meq(dstB))
+    doReturn(3L).when(manager2).sizeOf(meq(new File(testDir, "apps")))
+    manager2.initialize()
+    // "ApplicationStoreInfo.size" is updated for dstB.
+    assert(store.read(classOf[ApplicationStoreInfo], 
dstB.getAbsolutePath).size === 3)
+    assert(manager2.free() === 0)
+    val leaseC = manager2.lease(2)
+    doReturn(2L).when(manager2).sizeOf(meq(leaseC.tmpPath))
+    val dstC = leaseC.commit("app3", None)
+    assert(manager2.free() === 1)
+    assert(manager2.committed() === 2)
+    // Listing store tracks dstC only, dstB is evicted by "makeRoom()".
+    assert(store.read(classOf[ApplicationStoreInfo], 
dstC.getAbsolutePath).size === 2)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to