This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ff6a52426f5 [SPARK-48886][SS] Add version info to changelog v2 to 
allow for easier evolution
5ff6a52426f5 is described below

commit 5ff6a52426f53df4b6fbd3c1e27386df872d7cbe
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Tue Jul 16 07:57:17 2024 +0900

    [SPARK-48886][SS] Add version info to changelog v2 to allow for easier 
evolution
    
    ### What changes were proposed in this pull request?
    Add version info to changelog v2 to allow for easier evolution
    
    ### Why are the changes needed?
    Currently the changelog file format does not add the version info. With 
format v2, we propose to add this to the changelog file itself to make future 
evolution easier.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Augmented unit tests
    ```
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.RocksDBSuite, threads: 
ForkJoinPool.commonPool-worker-6 (daemon=true), 
ForkJoinPool.commonPool-worker-4 (daemon=true), 
ForkJoinPool.commonPool-worker-7 (daemon=true), 
ForkJoinPool.commonPool-worker-5 (daemon=true), 
ForkJoinPool.commonPool-worker-3 (daemon=true), rpc-boss-3-1 (daemon=true), 
ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (daemon=true), 
ForkJoinPool.commonPool-worker-1 (daemon=true) [...]
    [info] Run completed in 4 minutes, 23 seconds.
    [info] Total number of tests run: 176
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 176, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #47336 from anishshri-db/task/SPARK-48886.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../streaming/state/RocksDBFileManager.scala       | 43 ++++++++++++++++++----
 .../streaming/state/StateStoreChangelog.scala      | 38 ++++++++++++++++---
 .../execution/streaming/state/RocksDBSuite.scala   |  5 +++
 3 files changed, 72 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index fe7aeeb6fd3f..6c8db12635fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -171,6 +171,15 @@ class RocksDBFileManager(
     fileMappings
   }
 
+  private def getChangelogVersion(useColumnFamilies: Boolean): Short = {
+    val changelogVersion: Short = if (useColumnFamilies) {
+      2
+    } else {
+      1
+    }
+    changelogVersion
+  }
+
   def getChangeLogWriter(
       version: Long,
       useColumnFamilies: Boolean = false): StateStoreChangelogWriter = {
@@ -180,10 +189,16 @@ class RocksDBFileManager(
       if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
       rootDirChecked = true
     }
-    val changelogWriter = if (useColumnFamilies) {
-      new StateStoreChangelogWriterV2(fm, changelogFile, codec)
-    } else {
-      new StateStoreChangelogWriterV1(fm, changelogFile, codec)
+
+    val changelogVersion = getChangelogVersion(useColumnFamilies)
+    val changelogWriter = changelogVersion match {
+      case 1 =>
+        new StateStoreChangelogWriterV1(fm, changelogFile, codec)
+      case 2 =>
+        new StateStoreChangelogWriterV2(fm, changelogFile, codec)
+      case _ =>
+        throw new IllegalArgumentException(s"Failed to find changelog writer 
for " +
+          s"version=$changelogVersion")
     }
     changelogWriter
   }
@@ -193,11 +208,23 @@ class RocksDBFileManager(
       version: Long,
       useColumnFamilies: Boolean = false): StateStoreChangelogReader = {
     val changelogFile = dfsChangelogFile(version)
-    if (useColumnFamilies) {
-      new StateStoreChangelogReaderV2(fm, changelogFile, codec)
-    } else {
-      new StateStoreChangelogReaderV1(fm, changelogFile, codec)
+
+    // Note that ideally we should get the version for the reader from the
+    // changelog itself. However, since we don't record this for v1, we need to
+    // rely on external arguments to make this call today. Within the reader, 
we verify
+    // for the correctness of the decided/expected version. We might revisit 
this pattern
+    // as we add more changelog versions in the future.
+    val changelogVersion = getChangelogVersion(useColumnFamilies)
+    val changelogReader = changelogVersion match {
+      case 1 =>
+        new StateStoreChangelogReaderV1(fm, changelogFile, codec)
+      case 2 =>
+        new StateStoreChangelogReaderV2(fm, changelogFile, codec)
+      case _ =>
+        throw new IllegalArgumentException(s"Failed to find changelog reader 
for " +
+          s"version=$changelogVersion")
     }
+    changelogReader
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index 9c8803c8a17c..651d72da1609 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -93,10 +93,16 @@ abstract class StateStoreChangelogWriter(
     new DataOutputStream(compressed)
   }
 
+  protected def writeVersion(): Unit = {
+    compressedStream.writeUTF(s"v${version}")
+  }
+
   protected var backingFileStream: CancellableFSDataOutputStream =
     fm.createAtomic(file, overwriteIfPossible = true)
   protected var compressedStream: DataOutputStream = 
compressStream(backingFileStream)
 
+  def version: Short
+
   def put(key: Array[Byte], value: Array[Byte]): Unit
 
   def delete(key: Array[Byte]): Unit
@@ -140,6 +146,9 @@ class StateStoreChangelogWriterV1(
     compressionCodec: CompressionCodec)
   extends StateStoreChangelogWriter(fm, file, compressionCodec) {
 
+  // Note that v1 does not record this value in the changelog file
+  override def version: Short = 1
+
   override def put(key: Array[Byte], value: Array[Byte]): Unit = {
     assert(compressedStream != null)
     compressedStream.writeInt(key.size)
@@ -180,13 +189,13 @@ class StateStoreChangelogWriterV1(
 
 /**
  * Write changes to the key value state store instance to a changelog file.
- * There are 2 types of data records, put and delete.
- * A put record is written as: | record type | key length
- *    | key content | value length | value content | col family name length | 
col family name | -1 |
+ * There are 3 types of data records, put, merge and delete.
+ * A put record or merge record is written as: | record type | key length
+ *    | key content | value length | value content | -1 |
  * A delete record is written as: | record type | key length | key content | -1
- *    | col family name length | col family name | -1 |
  * Write an EOF_RECORD to signal the end of file.
- * The overall changelog format is: | put record | delete record | ... | put 
record | eof record |
+ * The overall changelog format is:  version | put record | delete record
+ *                                   | ... | put record | eof record |
  */
 class StateStoreChangelogWriterV2(
     fm: CheckpointFileManager,
@@ -194,6 +203,11 @@ class StateStoreChangelogWriterV2(
     compressionCodec: CompressionCodec)
   extends StateStoreChangelogWriter(fm, file, compressionCodec) {
 
+  override def version: Short = 2
+
+  // append the version field to the changelog file starting from version 2
+  writeVersion()
+
   override def put(key: Array[Byte], value: Array[Byte]): Unit = {
     writePutOrMergeRecord(key, value, RecordType.PUT_RECORD)
   }
@@ -265,6 +279,8 @@ abstract class StateStoreChangelogReader(
   }
   protected val input: DataInputStream = decompressStream(sourceStream)
 
+  def version: Short
+
   override protected def close(): Unit = { if (input != null) input.close() }
 
   override def getNext(): (RecordType.Value, Array[Byte], Array[Byte])
@@ -283,6 +299,9 @@ class StateStoreChangelogReaderV1(
     compressionCodec: CompressionCodec)
   extends StateStoreChangelogReader(fm, fileToRead, compressionCodec) {
 
+  // Note that v1 does not record this value in the changelog file
+  override def version: Short = 1
+
   override def getNext(): (RecordType.Value, Array[Byte], Array[Byte]) = {
     val keySize = input.readInt()
     // A -1 key size mean end of file.
@@ -314,7 +333,7 @@ class StateStoreChangelogReaderV1(
  * Read an iterator of change record from the changelog file.
  * A record is represented by tuple(recordType: RecordType.Value,
  * key: Array[Byte], value: Array[Byte])
- * A put record is returned as a tuple(recordType, key, value)
+ * A put or merge record is returned as a tuple(recordType, key, value)
  * A delete record is return as a tuple(recordType, key, null)
  */
 class StateStoreChangelogReaderV2(
@@ -330,6 +349,13 @@ class StateStoreChangelogReaderV2(
     blockBuffer
   }
 
+  override def version: Short = 2
+
+  // ensure that the version read is v2
+  val changelogVersionStr = input.readUTF()
+  assert(changelogVersionStr == "v2",
+    s"Changelog version mismatch: $changelogVersionStr != v2")
+
   override def getNext(): (RecordType.Value, Array[Byte], Array[Byte]) = {
     val recordType = RecordType.getRecordTypeFromByte(input.readByte())
     // A EOF_RECORD means end of file.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 5e9a5ecf44fc..5b3911df4606 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -591,6 +591,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     val fileManager = new RocksDBFileManager(
       dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
     val changelogWriter = fileManager.getChangeLogWriter(1)
+    assert(changelogWriter.version === 1)
 
     val ex = intercept[UnsupportedOperationException] {
       changelogWriter.merge("a", "1")
@@ -639,12 +640,14 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     val fileManager = new RocksDBFileManager(
       dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
     val changelogWriter = fileManager.getChangeLogWriter(1)
+    assert(changelogWriter.version === 1)
 
     (1 to 5).foreach(i => changelogWriter.put(i.toString, i.toString))
     (2 to 4).foreach(j => changelogWriter.delete(j.toString))
 
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
+    assert(changelogReader.version === 1)
     val entries = changelogReader.toSeq
     val expectedEntries = (1 to 5).map { i =>
       (RecordType.PUT_RECORD, i.toString.getBytes,
@@ -666,6 +669,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     val fileManager = new RocksDBFileManager(
       dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
     val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    assert(changelogWriter.version === 2)
     (1 to 5).foreach { i =>
       changelogWriter.put(i.toString, i.toString)
     }
@@ -679,6 +683,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1, true)
+    assert(changelogReader.version === 2)
     val entries = changelogReader.toSeq
     val expectedEntries = (1 to 5).map { i =>
       (RecordType.PUT_RECORD, i.toString.getBytes, i.toString.getBytes)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to