This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cf487b9f81fb [SPARK-49363][SS][TESTS] Add unit tests for potential 
RocksDB state store SST file mismatch
cf487b9f81fb is described below

commit cf487b9f81fb8b95d0596111187b863dc9037855
Author: Siying Dong <[email protected]>
AuthorDate: Fri Aug 30 11:24:20 2024 +0900

    [SPARK-49363][SS][TESTS] Add unit tests for potential RocksDB state store 
SST file mismatch
    
    ### What changes were proposed in this pull request?
    Add unit test to for RocksDB state store snapshot checkpointing for 
changelog. We intentionally add the same content in each batch, so that it is 
likely that SST files generated are all of the same size. We have some 
randomness on loading the existing version or move to the next, and whether 
maintenance task is executed. All three tests would fail for previous versions 
but not in master.
    
    ### Why are the changes needed?
    Recently we discovered some RocksDB state store file version ID mismatch 
issues. Although it happens to have been fixed by other change, we don't have 
test coverage for it. Add unit tests for them.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Run the tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47850 from siying/cp_test.
    
    Authored-by: Siying Dong <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../execution/streaming/state/RocksDBSuite.scala   | 133 +++++++++++++++++++++
 1 file changed, 133 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 90b7c2604076..d07ce07c41e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.language.implicitConversions
+import scala.util.Random
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
@@ -1770,6 +1771,138 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  testWithChangelogCheckpointingEnabled("reloading the same version") {
+    // Keep executing the same batch for two or more times. Some queries with 
ForEachBatch
+    // will cause this behavior.
+    // The test was accidentally fixed by SPARK-48586 
(https://github.com/apache/spark/pull/47130)
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
+    withDB(remoteDir, conf = conf) { db =>
+      // load the same version of pending snapshot uploading
+      // This is possible because after committing version x, we can continue 
to x+1, and replay
+      // x+1. The replay will load a checkpoint by version x. At this moment, 
the snapshot
+      // uploading may not be finished.
+      // Previously this generated a problem: new files generated by reloading 
are added to
+      // local -> cloud file map and the information is used to skip some 
files uploading, which is
+      // wrong because these files aren't a part of the RocksDB checkpoint.
+      // This test was accidentally fixed by
+      // SPARK-48931 (https://github.com/apache/spark/pull/47393)
+
+      db.load(0)
+      db.put("foo", "bar")
+      // Snapshot checkpoint not needed
+      db.commit()
+
+      // Continue using local DB
+      db.load(1)
+      db.put("foo", "bar")
+      // Should create a local RocksDB snapshot
+      db.commit()
+      // Upload the local RocksDB snapshot to the cloud with 2.zip
+      db.doMaintenance()
+
+      // This will reload Db from the cloud.
+      db.load(1)
+      db.put("foo", "bar")
+      // Should create another local snapshot
+      db.commit()
+
+      // Continue using local DB
+      db.load(2)
+      db.put("foo", "bar")
+      // Snapshot checkpoint not needed
+      db.commit()
+
+      // Reload DB from the cloud, loading from 2.zip
+      db.load(2)
+      db.put("foo", "bar")
+      // Snapshot checkpoint not needed
+      db.commit()
+
+      // Will upload local snapshot and overwrite 2.zip
+      db.doMaintenance()
+
+      // Reload new 2.zip just uploaded to validate it is not corrupted.
+      db.load(2)
+      db.put("foo", "bar")
+      db.commit()
+
+      // Test the maintenance thread is delayed even after the next snapshot 
is created.
+      // There will be two outstanding snapshots.
+      for (batchVersion <- 3 to 6) {
+        db.load(batchVersion)
+        db.put("foo", "bar")
+        // In batchVersion 3 and 5, it will generate a local snapshot but 
won't be uploaded.
+        db.commit()
+      }
+      db.doMaintenance()
+
+      // Test the maintenance is called after each batch. This tests a common 
case where
+      // maintenance tasks finish quickly.
+      for (batchVersion <- 7 to 10) {
+        for (j <- 0 to 1) {
+          db.load(batchVersion)
+          db.put("foo", "bar")
+          db.commit()
+          db.doMaintenance()
+        }
+      }
+    }
+  }
+
+  for (randomSeed <- 1 to 8) {
+    for (ifTestSkipBatch <- 0 to 1) {
+      testWithChangelogCheckpointingEnabled(
+        s"randomized snapshotting $randomSeed ifTestSkipBatch 
$ifTestSkipBatch") {
+        // The unit test simulates the case where batches can be reloaded and 
maintenance tasks
+        // can be delayed. After each batch, we randomly decide whether we 
would move onto the
+        // next batch, and whetehr maintenance task is executed.
+        val remoteDir = Utils.createTempDir().toString
+        val conf = dbConf.copy(minDeltasForSnapshot = 3, compactOnCommit = 
false)
+        new File(remoteDir).delete() // to make sure that the directory gets 
created
+        withDB(remoteDir, conf = conf) { db =>
+          // A second DB is opened to simulate another executor that runs some 
batches that
+          // skipped in the current DB.
+          withDB(remoteDir, conf = conf) { db2 =>
+            val random = new Random(randomSeed)
+            var curVer: Int = 0
+            for (i <- 1 to 100) {
+              db.load(curVer)
+              db.put("foo", "bar")
+              db.commit()
+              // For a one in five chance, maintenance task is executed. The 
chance is created to
+              // simulate the case where snapshot isn't immediatelly uploaded, 
and even delayed
+              // so that the next snapshot is ready. We create a snapshot in 
every 3 batches, so
+              // with 1/5 chance, it is more likely to create longer 
maintenance delay.
+              if (random.nextInt(5) == 0) {
+                db.doMaintenance()
+              }
+              // For half the chance, we move to the next version, and half 
the chance we keep the
+              // same version. When the same version is kept, the DB will be 
reloaded.
+              if (random.nextInt(2) == 0) {
+                val inc = if (ifTestSkipBatch == 1) {
+                  random.nextInt(3)
+                } else {
+                  1
+                }
+                if (inc > 1) {
+                  // Create changelog files in the gap
+                  for (j <- 1 to inc - 1) {
+                    db2.load(curVer + j)
+                    db2.put("foo", "bar")
+                    db2.commit()
+                  }
+                }
+                curVer = curVer + inc
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
   test("validate Rocks DB SST files do not have a VersionIdMismatch" +
     " when metadata file is not overwritten - scenario 1") {
     val fmClass = "org.apache.spark.sql.execution.streaming.state." +


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to