This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5a91b21e037 [SPARK-38277][SS] Clear write batch after RocksDB state 
store's commit
5a91b21e037 is described below

commit 5a91b21e037e4f11e34b584c384f8704ae0b081c
Author: Jungtaek Lim <[email protected]>
AuthorDate: Thu Dec 8 15:51:40 2022 +0900

    [SPARK-38277][SS] Clear write batch after RocksDB state store's commit
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to clear the write batch (and also corresponding prefix 
iterators) after commit has succeeded on RocksDB state store. This PR also 
fixes the test case as side effect, as it had been relying on the "sort of bug" 
that we didn't clean up write batch till either rollback or load has been 
called.
    
    ### Why are the changes needed?
    
    Without this, the memory usage of WriteBatch for RocksDB state store is 
"accumulated" over the partitions in the same executor. Say, 10 partitions in 
stateful operator are assigned to an executor and run sequentially. Given that 
we didn't clear write batch after commit, when the executor processes the last 
partition assigned to it, 10 WriteBatch instances contain all writes being 
performed in this microbatch.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a sort of bugfix.
    
    ### How was this patch tested?
    
    Existing tests, with fixing the test case.
    
    Closes #38880 from HeartSaVioR/SPARK-38277.
    
    Lead-authored-by: Jungtaek Lim <[email protected]>
    Co-authored-by: Yun Tang <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
    (cherry picked from commit 9eabe67a693c28509dda25b3e5998eb7ca7a3aa9)
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala  | 7 ++++++-
 .../apache/spark/sql/execution/streaming/state/RocksDBSuite.scala | 8 ++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 66e14f6bff1..fbec5d3b598 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -132,7 +132,8 @@ class RocksDB(
       if (conf.resetStatsOnLoad) {
         nativeStats.reset
       }
-      // reset resources to prevent side-effects from previous loaded version
+      // reset resources to prevent side-effects from previous loaded version 
if it was not cleaned
+      // up correctly
       closePrefixScanIterators()
       resetWriteBatch()
       logInfo(s"Loaded $version")
@@ -318,6 +319,10 @@ class RocksDB(
     } finally {
       db.continueBackgroundWork()
       silentDeleteRecursively(checkpointDir, s"committing $newVersion")
+      // reset resources as either 1) we already pushed the changes and it has 
been committed or
+      // 2) commit has failed and the current version is "invalidated".
+      closePrefixScanIterators()
+      resetWriteBatch()
       release()
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 75717d27687..9a0a1a55b88 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -118,7 +118,11 @@ class RocksDBSuite extends SparkFunSuite {
     withDB(remoteDir, conf = conf) { db =>
       // Generate versions without cleaning up
       for (version <- 1 to 50) {
-        db.put(version.toString, version.toString)  // update "1" -> "1", "2" 
-> "2", ...
+        if (version > 1) {
+          // remove keys we wrote in previous iteration to ensure compaction 
happens
+          db.remove((version - 1).toString)
+        }
+        db.put(version.toString, version.toString)
         db.commit()
       }
 
@@ -134,7 +138,7 @@ class RocksDBSuite extends SparkFunSuite {
       versionsPresent.foreach { version =>
         db.load(version)
         val data = db.iterator().map(toStr).toSet
-        assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet)
+        assert(data === Set((version.toString, version.toString)))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to