This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c0fbc6ba7d2d [SPARK-51675][SS] Fix col family creation after opening
local DB to avoid snapshot creation, if not necessary
c0fbc6ba7d2d is described below
commit c0fbc6ba7d2d8866d4266bcdf75729c5c2decd40
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Tue Apr 1 15:42:49 2025 +0900
[SPARK-51675][SS] Fix col family creation after opening local DB to avoid
snapshot creation, if not necessary
### What changes were proposed in this pull request?
Fix col family creation after opening local DB to avoid snapshot creation,
if not necessary
### Why are the changes needed?
Without this, we might force snapshot creation where its not required
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Fixed unit tests
```
[info] Run completed in 2 minutes, 1 second.
[info] Total number of tests run: 4
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #50471 from anishshri-db/task/SPARK-51675.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 4 +++-
.../execution/streaming/state/RocksDBSuite.scala | 22 +++-------------------
2 files changed, 6 insertions(+), 20 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index d4bf9d31617b..85bb77b7afb3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -532,11 +532,13 @@ class RocksDB(
maxColumnFamilyId.set(maxId)
}
+ openDB()
+ // Call this after opening the DB to ensure that forcing snapshot is not
triggered
+ // unnecessarily.
if (useColumnFamilies) {
createColFamilyIfAbsent(StateStore.DEFAULT_COL_FAMILY_NAME, isInternal =
false)
}
- openDB()
val (numKeys, numInternalKeys) = {
if (!conf.trackTotalNumberOfRows) {
// we don't track the total number of rows - discard the number being
track
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 9a79f3fa0ae8..b5a2ec43001a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -949,13 +949,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db.commit()
}
- if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
- // This is because 30 is executed twice and snapshot does not
overwrite in checkpoint v2
- assert(snapshotVersionsPresent(remoteDir) === (1 to 30) :+ 30 :+ 31)
- } else {
- assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
- }
-
+ assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
assert(changelogVersionsPresent(remoteDir) === (30 to 60))
for (version <- 1 to 60) {
db.load(version, versionToUniqueId.get(version), readOnly = true)
@@ -972,20 +966,10 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
// Check that snapshots and changelogs get purged correctly.
db.doMaintenance()
- // Behavior is slightly different when column families are enabled with
checkpoint v2
- // since snapshot version 31 was created previously.
- if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
- assert(snapshotVersionsPresent(remoteDir) === Seq(31, 60, 60))
- } else {
- assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
- }
+ assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
if (enableStateStoreCheckpointIds) {
// recommit version 60 creates another changelog file with different
unique id
- if (colFamiliesEnabled) {
- assert(changelogVersionsPresent(remoteDir) === (31 to 60) :+ 60)
- } else {
- assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60)
- }
+ assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60)
} else {
assert(changelogVersionsPresent(remoteDir) === (30 to 60))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]