This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 1e4dc0d8cbd5 [SPARK-51724][SS] RocksDB StateStore's lineage manager 
should be synchronized
1e4dc0d8cbd5 is described below

commit 1e4dc0d8cbd50f8ff535c6294e9c1350dc44087a
Author: Siying Dong <dong...@gmail.com>
AuthorDate: Tue Apr 8 22:14:54 2025 +0900

    [SPARK-51724][SS] RocksDB StateStore's lineage manager should be 
synchronized
    
    ### Why are the changes needed?
    RocksDB State Store's Lineage Manager currently isn't synchronized, but it 
can be accessed by both DB loading and maintenance thread. In theory, it can 
cause wrong lineage:
    1. maintenance thread get current lineage
    2. task commit() adds a lineage from the lienage
    3. maintenance thread does the truncation and store it back
    
    In this case, the new lineage added by 2. is lost.
    It should be fixed by simply synchronizing those operations.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Run existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #50520 from siying/lineagemanagerrace.
    
    Authored-by: Siying Dong <dong...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 187adb83fae9c8bb5f6e27fd121e457aabbb78b6)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/state/RocksDB.scala    | 25 +++++++++++++---------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 9822ee0be0e0..6cd06e14f74f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1462,8 +1462,7 @@ class RocksDB(
       // This is relative aggressive because that even if the uploading 
succeeds,
       // it is not necessarily the one written to the commit log. But we can 
always load lineage
       // from commit log so it is fine.
-      lineageManager.resetLineage(lineageManager.getLineageForCurrVersion()
-        .filter(i => i.version >= snapshot.version))
+      lineageManager.truncateFromVersion(snapshot.version)
       logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: " +
         log"Upload snapshot of version ${MDC(LogKeys.VERSION_NUM, 
snapshot.version)}, " +
         log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " +
@@ -1975,27 +1974,33 @@ case class AcquiredThreadInfo(
 private[sql] class RocksDBLineageManager {
   @volatile private var lineage: Array[LineageItem] = Array.empty
 
-  override def toString: String = lineage.map {
-    case LineageItem(version, uuid) => s"$version: $uuid"
-  }.mkString(" ")
+  override def toString: String = synchronized {
+    lineage.map {
+      case LineageItem(version, uuid) => s"$version: $uuid"
+    }.mkString(" ")
+  }
 
-  def appendLineageItem(item: LineageItem): Unit = {
+  def appendLineageItem(item: LineageItem): Unit = synchronized {
     lineage = lineage :+ item
   }
 
-  def resetLineage(newLineage: Array[LineageItem]): Unit = {
+  def truncateFromVersion(versionToKeep: Long): Unit = synchronized {
+    resetLineage(getLineageForCurrVersion().filter(i => i.version >= 
versionToKeep))
+  }
+
+  def resetLineage(newLineage: Array[LineageItem]): Unit = synchronized {
     lineage = newLineage
   }
 
-  def getLineageForCurrVersion(): Array[LineageItem] = {
+  def getLineageForCurrVersion(): Array[LineageItem] = synchronized {
     lineage.clone()
   }
 
-  def contains(item: LineageItem): Boolean = {
+  def contains(item: LineageItem): Boolean = synchronized {
     lineage.contains(item)
   }
 
-  def clear(): Unit = {
+  def clear(): Unit = synchronized {
     lineage = Array.empty
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to