This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2ce5d7280624 [SPARK-48903][SS] Set the RocksDB last snapshot version
correctly on remote load
2ce5d7280624 is described below
commit 2ce5d7280624961c57806fe950b5a3ec88075160
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Wed Jul 17 10:21:28 2024 +0900
[SPARK-48903][SS] Set the RocksDB last snapshot version correctly on remote
load
### What changes were proposed in this pull request?
Set the RocksDB last snapshot version correctly on remote load
### Why are the changes needed?
Avoid creating full snapshot on every first batch after restart and also
reset a snapshot that is likely no longer valid
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests
```
===== POSSIBLE THREAD LEAK IN SUITE
o.a.s.sql.execution.streaming.state.RocksDBSuite, threads:
ForkJoinPool.commonPool-worker-6 (daemon=true),
ForkJoinPool.commonPool-worker-4 (daemon=true),
ForkJoinPool.commonPool-worker-7 (daemon=true),
ForkJoinPool.commonPool-worker-5 (daemon=true),
ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true),
ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true),
ForkJoinPool.commonPool-worker-1 (daemon=true) [...]
[info] Run completed in 4 minutes, 40 seconds.
[info] Total number of tests run: 176
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 176, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47363 from anishshri-db/task/SPARK-48903.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/execution/streaming/state/RocksDB.scala | 10 +++++++++-
.../sql/execution/streaming/state/RocksDBSuite.scala | 15 +++++++++++----
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index c748e5346719..2f3f5a57261f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -189,8 +189,16 @@ class RocksDB(
// reset last snapshot version
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
- lastSnapshotVersion = 0L
+ synchronized {
+ if (latestSnapshot.isDefined) {
+ oldSnapshots += latestSnapshot.get
+ latestSnapshot = None
+ }
+ }
}
+
+ // reset the last snapshot version to the latest available snapshot
version
+ lastSnapshotVersion = latestSnapshotVersion
openDB()
numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 5b3911df4606..90331b8a098f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -306,11 +306,19 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))
}
+ // pick up from the last snapshot and the next upload will be for version
21
withDB(remoteDir, conf = conf) { db =>
db.load(18)
db.commit()
db.doMaintenance()
- assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 19))
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18))
+
+ for (version <- 19 to 20) {
+ db.load(version)
+ db.commit()
+ }
+ db.doMaintenance()
+ assert(snapshotVersionsPresent(remoteDir) === Seq(3, 6, 18, 21))
}
}
@@ -1645,7 +1653,6 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
testWithChangelogCheckpointingEnabled("time travel 5 -" +
"validate successful RocksDB load when metadata file is not overwritten") {
- // Ensure commit doesn't modify the latestSnapshot that doMaintenance will
upload
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
"NoOverwriteFileSystemBasedCheckpointFileManager"
withTempDir { dir =>
@@ -1663,9 +1670,9 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
db.load(0)
db.put("a", "1")
- // upload version 1 snapshot created above
+ // do not upload version 1 snapshot created previously
db.doMaintenance()
- assert(snapshotVersionsPresent(remoteDir) == Seq(1))
+ assert(snapshotVersionsPresent(remoteDir) == Seq.empty)
db.commit() // create snapshot again
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]