This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 08640961e3b [SPARK-45472][SS] RocksDB State Store Doesn't Need to 
Recheck checkpoint path existence
08640961e3b is described below

commit 08640961e3bad7de38ed3358df8706bad028c27a
Author: Siying Dong <siying.d...@databricks.com>
AuthorDate: Tue Oct 10 11:24:31 2023 +0900

    [SPARK-45472][SS] RocksDB State Store Doesn't Need to Recheck checkpoint 
path existence
    
    ### What changes were proposed in this pull request?
    In RocksDBFileManager, we add a variable to indicate that root path is 
already checked and created if not existing, so that we don't need to recheck 
the second time.
    
    ### Why are the changes needed?
    Right now, every time RocksDB.load() is called, we check checkpoint 
directory existence and create it if not. This is relatively expensive and show 
up in performance profiling.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing CI tests to cover it.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43299 from siying/rootPath.
    
    Authored-by: Siying Dong <siying.d...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../execution/streaming/state/RocksDBFileManager.scala   | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index eae9aac3c0a..3d0745c2fb3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -146,10 +146,15 @@ class RocksDBFileManager(
 
   private def codec = CompressionCodec.createCodec(sparkConf, codecName)
 
+  @volatile private var rootDirChecked: Boolean = false
+
   def getChangeLogWriter(version: Long): StateStoreChangelogWriter = {
-    val rootDir = new Path(dfsRootDir)
     val changelogFile = dfsChangelogFile(version)
-    if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
+    if (!rootDirChecked) {
+      val rootDir = new Path(dfsRootDir)
+      if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
+      rootDirChecked = true
+    }
     val changelogWriter = new StateStoreChangelogWriter(fm, changelogFile, 
codec)
     changelogWriter
   }
@@ -193,8 +198,11 @@ class RocksDBFileManager(
       // CheckpointFileManager.createAtomic API which doesn't auto-initialize 
parent directories.
       // Moreover, once we disable to track the number of keys, in which the 
numKeys is -1, we
       // still need to create the initial dfs root directory anyway.
-      val path = new Path(dfsRootDir)
-      if (!fm.exists(path)) fm.mkdirs(path)
+      if (!rootDirChecked) {
+        val path = new Path(dfsRootDir)
+        if (!fm.exists(path)) fm.mkdirs(path)
+        rootDirChecked = true
+      }
     }
     zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
     logInfo(s"Saved checkpoint file for version $version")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to