This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ce5d7280624 [SPARK-48903][SS] Set the RocksDB last snapshot version 
correctly on remote load
2ce5d7280624 is described below

commit 2ce5d7280624961c57806fe950b5a3ec88075160
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Wed Jul 17 10:21:28 2024 +0900

    [SPARK-48903][SS] Set the RocksDB last snapshot version correctly on remote 
load
    
    ### What changes were proposed in this pull request?
    Set the RocksDB last snapshot version correctly on remote load
    
    ### Why are the changes needed?
    Avoid creating full snapshot on every first batch after restart and also 
reset a snapshot that is likely no longer valid
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests
    ```
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.RocksDBSuite, threads: 
ForkJoinPool.commonPool-worker-6 (daemon=true), 
ForkJoinPool.commonPool-worker-4 (daemon=true), 
ForkJoinPool.commonPool-worker-7 (daemon=true), 
ForkJoinPool.commonPool-worker-5 (daemon=true), 
ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true), 
ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true), 
ForkJoinPool.commonPool-worker-1 (daemon=true) [...]
    [info] Run completed in 4 minutes, 40 seconds.
    [info] Total number of tests run: 176
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 176, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47363 from anishshri-db/task/SPARK-48903.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../spark/sql/execution/streaming/state/RocksDB.scala     | 10 +++++++++-
 .../sql/execution/streaming/state/RocksDBSuite.scala      | 15 +++++++++++----
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index c748e5346719..2f3f5a57261f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -189,8 +189,16 @@ class RocksDB(
         // reset last snapshot version
         if (lastSnapshotVersion > latestSnapshotVersion) {
           // discard any newer snapshots
-          lastSnapshotVersion = 0L
+          synchronized {
+            if (latestSnapshot.isDefined) {
+              oldSnapshots += latestSnapshot.get
+              latestSnapshot = None
+            }
+          }
         }
+
+        // reset the last snapshot version to the latest available snapshot 
version
+        lastSnapshotVersion = latestSnapshotVersion
         openDB()
 
         numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 5b3911df4606..90331b8a098f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -306,11 +306,19 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
       assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))
     }
 
+    // pick up from the last snapshot and the next upload will be for version 
21
     withDB(remoteDir, conf = conf) { db =>
       db.load(18)
       db.commit()
       db.doMaintenance()
-      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 19))
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))
+
+      for (version <- 19 to 20) {
+        db.load(version)
+        db.commit()
+      }
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 21))
     }
   }
 
@@ -1645,7 +1653,6 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 
   testWithChangelogCheckpointingEnabled("time travel 5 -" +
     "validate successful RocksDB load when metadata file is not overwritten") {
-    // Ensure commit doesn't modify the latestSnapshot that doMaintenance will 
upload
     val fmClass = "org.apache.spark.sql.execution.streaming.state." +
       "NoOverwriteFileSystemBasedCheckpointFileManager"
     withTempDir { dir =>
@@ -1663,9 +1670,9 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
         db.load(0)
         db.put("a", "1")
 
-        // upload version 1 snapshot created above
+        // do not upload version 1 snapshot created previously
         db.doMaintenance()
-        assert(snapshotVersionsPresent(remoteDir) == Seq(1))
+        assert(snapshotVersionsPresent(remoteDir) == Seq.empty)
 
         db.commit() // create snapshot again
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to