This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 1e4dc0d8cbd5 [SPARK-51724][SS] RocksDB StateStore's lineage manager should be synchronized 1e4dc0d8cbd5 is described below commit 1e4dc0d8cbd50f8ff535c6294e9c1350dc44087a Author: Siying Dong <dong...@gmail.com> AuthorDate: Tue Apr 8 22:14:54 2025 +0900 [SPARK-51724][SS] RocksDB StateStore's lineage manager should be synchronized ### Why are the changes needed? RocksDB State Store's Lineage Manager currently isn't synchronized, but it can be accessed by both DB loading and maintenance thread. In theory, it can cause wrong lineage: 1. maintenance thread get current lineage 2. task commit() adds a lineage from the lienage 3. maintenance thread does the truncation and store it back In this case, the new lineage added by 2. is lost. It should be fixed by simply synchronizing those operations. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Run existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50520 from siying/lineagemanagerrace. Authored-by: Siying Dong <dong...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 187adb83fae9c8bb5f6e27fd121e457aabbb78b6) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/execution/streaming/state/RocksDB.scala | 25 +++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 9822ee0be0e0..6cd06e14f74f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1462,8 +1462,7 @@ class RocksDB( // This is relative aggressive because that even if the uploading succeeds, // it is not necessarily the one written to the commit log. But we can always load lineage // from commit log so it is fine. - lineageManager.resetLineage(lineageManager.getLineageForCurrVersion() - .filter(i => i.version >= snapshot.version)) + lineageManager.truncateFromVersion(snapshot.version) logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: " + log"Upload snapshot of version ${MDC(LogKeys.VERSION_NUM, snapshot.version)}, " + log"with uniqueId: ${MDC(LogKeys.UUID, snapshot.uniqueId)} " + @@ -1975,27 +1974,33 @@ case class AcquiredThreadInfo( private[sql] class RocksDBLineageManager { @volatile private var lineage: Array[LineageItem] = Array.empty - override def toString: String = lineage.map { - case LineageItem(version, uuid) => s"$version: $uuid" - }.mkString(" ") + override def toString: String = synchronized { + lineage.map { + case LineageItem(version, uuid) => s"$version: $uuid" + }.mkString(" ") + } - def appendLineageItem(item: LineageItem): Unit = { + def appendLineageItem(item: LineageItem): Unit = synchronized { lineage = lineage :+ item } - def resetLineage(newLineage: Array[LineageItem]): Unit = { + def truncateFromVersion(versionToKeep: Long): Unit = synchronized { + resetLineage(getLineageForCurrVersion().filter(i => i.version >= versionToKeep)) + } + + def resetLineage(newLineage: Array[LineageItem]): Unit = synchronized { lineage = newLineage } - def getLineageForCurrVersion(): Array[LineageItem] = { + def getLineageForCurrVersion(): Array[LineageItem] = synchronized { lineage.clone() } - def contains(item: LineageItem): Boolean = { + def contains(item: LineageItem): Boolean = synchronized { lineage.contains(item) } - def clear(): Unit = { + def clear(): Unit = synchronized { lineage = Array.empty } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org