This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new eddf40d [SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing eddf40d is described below commit eddf40d5e48ada258dff45661816073cb39eb721 Author: Zhen Li <z...@microsoft.com> AuthorDate: Wed Jul 8 21:58:45 2020 +0900 [SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing ### What changes were proposed in this pull request? Update ApplicationStoreInfo.size to real size during HistoryServerDiskManager initializing. ### Why are the changes needed? This PR is for fixing bug [32024](https://issues.apache.org/jira/browse/SPARK-32024). We found after history server restart, below error would randomly happen: "java.lang.IllegalStateException: Disk usage tracker went negative (now = -***, delta = -***)" from `HistoryServerDiskManager`. ![Capture](https://user-images.githubusercontent.com/10524738/85034468-fda4ae80-b136-11ea-9011-f0c3e6508002.JPG) **Cause**: Reading data from level db would trigger table file compaction, which may also trigger size of level db directory changes. This size change may not be recorded in LevelDB (`ApplicationStoreInfo` in `listing`). When service restarts, `currentUsage` is calculated from real directory size, but `ApplicationStoreInfo` are loaded from leveldb, then `currentUsage` may be less then sum of `ApplicationStoreInfo.size`. In `makeRoom()` function, `ApplicationStoreInfo.size` is used to [...] **Reproduce**: we can reproduce this issue in dev environment by reducing config value of "spark.history.retainedApplications" and "spark.history.store.maxDiskUsage" to some small values. Here are steps: 1. start history server, load some applications and access some pages (maybe "stages" page to trigger leveldb compaction). 2. restart HS, and refresh pages. I also added an UT to simulate this case in `HistoryServerDiskManagerSuite`. **Benefit**: this change would help improve history server reliability. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add unit test and manually tested it. Closes #28859 from zhli1142015/update-ApplicationStoreInfo.size-during-disk-manager-initialize. Authored-by: Zhen Li <z...@microsoft.com> Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> (cherry picked from commit 8e7fc04637bbb8d2fdc2c758746e0eaf496c4d92) Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> --- .../deploy/history/HistoryServerDiskManager.scala | 21 ++++++++-- .../history/HistoryServerDiskManagerSuite.scala | 46 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala index ad0dd23..dc17140 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala @@ -76,14 +76,29 @@ private class HistoryServerDiskManager( // Go through the recorded store directories and remove any that may have been removed by // external code. - val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info => - !new File(info.path).exists() - }.toSeq + val (existences, orphans) = listing + .view(classOf[ApplicationStoreInfo]) + .asScala + .toSeq + .partition { info => + new File(info.path).exists() + } orphans.foreach { info => listing.delete(info.getClass(), info.path) } + // Reading level db would trigger table file compaction, then it may cause size of level db + // directory changed. When service restarts, "currentUsage" is calculated from real directory + // size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals + // sum of "ApplicationStoreInfo.size". + existences.foreach { info => + val fileSize = sizeOf(new File(info.path)) + if (fileSize != info.size) { + listing.write(info.copy(size = fileSize)) + } + } + logInfo("Initialized disk manager: " + s"current usage = ${Utils.bytesToString(currentUsage.get())}, " + s"max usage = ${Utils.bytesToString(maxUsage)}") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala index 4b1b921..a050519 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala @@ -157,4 +157,50 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter { assert(manager.approximateSize(50L, true) > 50L) } + test("SPARK-32024: update ApplicationStoreInfo.size during initializing") { + val manager = mockManager() + val leaseA = manager.lease(2) + doReturn(3L).when(manager).sizeOf(meq(leaseA.tmpPath)) + val dstA = leaseA.commit("app1", None) + assert(manager.free() === 0) + assert(manager.committed() === 3) + // Listing store tracks dstA now. + assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 3) + + // Simulate: service restarts, new disk manager (manager1) is initialized. + val manager1 = mockManager() + // Simulate: event KVstore compaction before restart, directory size reduces. + doReturn(2L).when(manager1).sizeOf(meq(dstA)) + doReturn(2L).when(manager1).sizeOf(meq(new File(testDir, "apps"))) + manager1.initialize() + // "ApplicationStoreInfo.size" is updated for dstA. + assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 2) + assert(manager1.free() === 1) + // If "ApplicationStoreInfo.size" is not correctly updated, "IllegalStateException" + // would be thrown. + val leaseB = manager1.lease(2) + assert(manager1.free() === 1) + doReturn(2L).when(manager1).sizeOf(meq(leaseB.tmpPath)) + val dstB = leaseB.commit("app2", None) + assert(manager1.committed() === 2) + // Listing store tracks dstB only, dstA is evicted by "makeRoom()". + assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 2) + + val manager2 = mockManager() + // Simulate: cache entities are written after replaying, directory size increases. + doReturn(3L).when(manager2).sizeOf(meq(dstB)) + doReturn(3L).when(manager2).sizeOf(meq(new File(testDir, "apps"))) + manager2.initialize() + // "ApplicationStoreInfo.size" is updated for dstB. + assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 3) + assert(manager2.free() === 0) + val leaseC = manager2.lease(2) + doReturn(2L).when(manager2).sizeOf(meq(leaseC.tmpPath)) + val dstC = leaseC.commit("app3", None) + assert(manager2.free() === 1) + assert(manager2.committed() === 2) + // Listing store tracks dstC only, dstB is evicted by "makeRoom()". + assert(store.read(classOf[ApplicationStoreInfo], dstC.getAbsolutePath).size === 2) + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org