This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 205e382d5643 [SPARK-50652][SS] Add checks to RocksDB V2 backward 
compatibility
205e382d5643 is described below

commit 205e382d56434ef156e67382b8925c70ad77c174
Author: Wei Liu <[email protected]>
AuthorDate: Sat Jan 18 06:44:21 2025 +0900

    [SPARK-50652][SS] Add checks to RocksDB V2 backward compatibility
    
    ### What changes were proposed in this pull request?
    
    Currently in rocksDB level, v2 -> v1 backward compatibility is not 
supported. This PR adds a exact version check in commit log level for this 
purpose.
    
    ### Why are the changes needed?
    
    For compatibility check
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Modify test cases and filed followups
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #49275 from WweiL/commit-log-version-followup.
    
    Authored-by: Wei Liu <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../src/main/resources/error/error-conditions.json | 24 +++++++++
 .../scala/org/apache/spark/SparkException.scala    | 19 +++++++
 .../spark/sql/errors/QueryExecutionErrors.scala    | 28 ++++++++++
 .../spark/sql/execution/streaming/CommitLog.scala  |  4 +-
 .../sql/execution/streaming/HDFSMetadataLog.scala  |  3 ++
 .../execution/streaming/MetadataVersionUtil.scala  | 62 ++++++++++++++--------
 .../spark/sql/streaming/CommitLogSuite.scala       | 27 +++++++---
 7 files changed, 135 insertions(+), 32 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index d949ece67903..2506c20453ae 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2870,6 +2870,24 @@
     },
     "sqlState" : "42K0E"
   },
+  "INVALID_LOG_VERSION" : {
+    "message" : [
+      "UnsupportedLogVersion."
+    ],
+    "subClass" : {
+      "EXACT_MATCH_VERSION" : {
+        "message" : [
+          "The only supported log version is v<matchVersion>, but encountered 
v<version>."
+        ]
+      },
+      "MAX_SUPPORTED_VERSION" : {
+        "message" : [
+          "The maximum supported log version is v<maxSupportedVersion>, but 
encountered v<version>. The log file was produced by a newer version of Spark 
and cannot be read by this version. You need to upgrade."
+        ]
+      }
+    },
+    "sqlState" : "KD002"
+  },
   "INVALID_NON_DETERMINISTIC_EXPRESSIONS" : {
     "message" : [
       "The operator expects a deterministic expression, but the actual 
expression is <sqlExprs>."
@@ -3627,6 +3645,12 @@
     ],
     "sqlState" : "KD000"
   },
+  "MALFORMED_LOG_FILE" : {
+    "message" : [
+      "Log file was malformed: failed to read correct log version from <text>."
+    ],
+    "sqlState" : "KD002"
+  },
   "MALFORMED_PROTOBUF_MESSAGE" : {
     "message" : [
       "Malformed Protobuf messages are detected in message deserialization. 
Parse Mode: <failFastMode>. To process malformed protobuf message as null 
result, try setting the option 'mode' as 'PERMISSIVE'."
diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala 
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 0c0a1902ee2a..00989fd29095 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -452,6 +452,25 @@ private[spark] class SparkIllegalArgumentException private(
   override def getQueryContext: Array[QueryContext] = context
 }
 
+/**
+ * IllegalStateException thrown from Spark with an error class.
+ */
+private[spark] class SparkIllegalStateException(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    context: Array[QueryContext] = Array.empty,
+    cause: Throwable = null)
+  extends IllegalStateException(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters), cause)
+    with SparkThrowable {
+
+  override def getMessageParameters: java.util.Map[String, String] = 
messageParameters.asJava
+
+  override def getCondition: String = errorClass
+
+  override def getQueryContext: Array[QueryContext] = context
+}
+
 private[spark] class SparkRuntimeException private(
     message: String,
     cause: Option[Throwable],
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 1ae2e5445c0c..69518b548653 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2631,6 +2631,34 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       cause = null)
   }
 
+  def malformedLogFile(text: String): Throwable = {
+    new SparkIllegalStateException(
+      errorClass = "MALFORMED_LOG_FILE",
+      messageParameters = Map("text" -> text),
+      cause = null
+    )
+  }
+
+  def logVersionGreaterThanSupported(version: Int, maxSupportedVersion: Int): 
Throwable = {
+    new SparkIllegalStateException(
+      errorClass = "INVALID_LOG_VERSION.MAX_SUPPORTED_VERSION",
+      messageParameters = Map(
+        "version" -> version.toString,
+        "maxSupportedVersion" -> maxSupportedVersion.toString),
+      cause = null
+    )
+  }
+
+  def logVersionNotMatch(version: Int, matchVersion: Int): Throwable = {
+    new SparkIllegalStateException(
+      errorClass = "INVALID_LOG_VERSION.EXACT_MATCH_VERSION",
+      messageParameters = Map(
+        "version" -> version.toString,
+        "matchVersion" -> matchVersion.toString),
+      cause = null
+    )
+  }
+
   def invalidChangeLogReaderVersion(version: Long): Throwable = {
     new SparkException(
       errorClass = "CANNOT_LOAD_STATE_STORE.INVALID_CHANGE_LOG_READER_VERSION",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
index a7fcaa5ccbcb..f501b62b8a71 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
@@ -61,7 +61,9 @@ class CommitLog(sparkSession: SparkSession, path: String)
       throw new IllegalStateException("Incomplete log file in the offset 
commit log")
     }
     // TODO [SPARK-49462] This validation should be relaxed for a stateless 
query.
-    validateVersion(lines.next().trim, VERSION)
+    // TODO [SPARK-50653] This validation should be relaxed to support reading
+    //  a V1 log file when VERSION is V2
+    validateVersionExactMatch(lines.next().trim, VERSION)
     val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON
     CommitMetadata(metadataJson)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 251cc16acdf4..423648f44590 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -357,6 +357,9 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
 
   private[sql] def validateVersion(text: String, maxSupportedVersion: Int): 
Int =
     MetadataVersionUtil.validateVersion(text, maxSupportedVersion)
+
+  private[sql] def validateVersionExactMatch(text: String, matchVersion: Int): 
Int =
+    MetadataVersionUtil.validateVersionExactMatch(text, matchVersion)
 }
 
 object HDFSMetadataLog {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala
index 548f2aa5d5c5..854e36b5304a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala
@@ -17,35 +17,51 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.sql.errors.QueryExecutionErrors
+
 object MetadataVersionUtil {
   /**
    * Parse the log version from the given `text` -- will throw exception when 
the parsed version
-   * exceeds `maxSupportedVersion`, or when `text` is malformed (such as 
"xyz", "v", "v-1",
-   * "v123xyz" etc.)
+   * exceeds `maxSupportedVersion`, or when `text` is malformed.
    */
   def validateVersion(text: String, maxSupportedVersion: Int): Int = {
-    if (text.length > 0 && text(0) == 'v') {
-      val version =
-        try {
-          text.substring(1, text.length).toInt
-        } catch {
-          case _: NumberFormatException =>
-            throw new IllegalStateException(s"Log file was malformed: failed 
to read correct log " +
-              s"version from $text.")
-        }
-      if (version > 0) {
-        if (version > maxSupportedVersion) {
-          throw new IllegalStateException(s"UnsupportedLogVersion: maximum 
supported log version " +
-            s"is v${maxSupportedVersion}, but encountered v$version. The log 
file was produced " +
-            s"by a newer version of Spark and cannot be read by this version. 
Please upgrade.")
-        } else {
-          return version
-        }
-      }
+    val version: Int = extractVersion(text)
+    if (version > maxSupportedVersion) {
+      throw QueryExecutionErrors.logVersionGreaterThanSupported(version, 
maxSupportedVersion)
     }
+    version
+  }
 
-    // reaching here means we failed to read the correct log version
-    throw new IllegalStateException(s"Log file was malformed: failed to read 
correct log " +
-      s"version from $text.")
+  /**
+   * Parse the log version from the given `text` -- will throw exception when 
the parsed version
+   * does not equal to `matchVersion`, or when `text` is malformed.
+   */
+  def validateVersionExactMatch(text: String, matchVersion: Int): Int = {
+    val version: Int = extractVersion(text)
+    if (version != matchVersion) {
+      throw QueryExecutionErrors.logVersionNotMatch(version, matchVersion)
+    }
+    version
+  }
+
+  /**
+   * Parse the log version from the given `text` -- will throw exception when 
the parsed version
+   * when `text` is malformed (such as "xyz", "v", "v-1", "v123xyz" etc.)
+   */
+  private def extractVersion(text: String): Int = {
+    val version: Int = if (text.nonEmpty && text(0) == 'v') {
+      try {
+        text.substring(1, text.length).toInt
+      } catch {
+        case _: NumberFormatException =>
+          throw QueryExecutionErrors.malformedLogFile(text)
+      }
+    } else {
+      throw QueryExecutionErrors.malformedLogFile(text)
+    }
+    if (version <= 0) {
+      throw QueryExecutionErrors.malformedLogFile(text)
+    }
+    version
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala
index ea2f4154170a..1af35d0a1db4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala
@@ -117,13 +117,24 @@ class CommitLogSuite extends SparkFunSuite with 
SharedSparkSession {
 
   // Old metadata structure with no state unique ids should not affect the 
deserialization
   test("Cross-version V1 SerDe") {
-    val commitlogV1 = """v1
-                        |{"nextBatchWatermarkMs":233}""".stripMargin
-    val inputStream: ByteArrayInputStream =
-      new ByteArrayInputStream(commitlogV1.getBytes("UTF-8"))
-    val commitMetadata: CommitMetadata = new CommitLog(
-      spark, testCommitLogV1FilePath.toString).deserialize(inputStream)
-    assert(commitMetadata.nextBatchWatermarkMs === 233)
-    assert(commitMetadata.stateUniqueIds === None)
+    withSQLConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") {
+      val commitlogV1 = """v1
+                          |{"nextBatchWatermarkMs":233}""".stripMargin
+      val inputStream: ByteArrayInputStream =
+        new ByteArrayInputStream(commitlogV1.getBytes("UTF-8"))
+
+      // TODO [SPARK-50653]: Uncomment the below when v2 -> v1 backward 
compatibility is added
+      // val commitMetadata: CommitMetadata = new CommitLog(
+      // spark, testCommitLogV1FilePath.toString).deserialize(inputStream)
+      // assert(commitMetadata.nextBatchWatermarkMs === 233)
+      // assert(commitMetadata.stateUniqueIds === Map.empty)
+
+      // TODO [SPARK-50653]: remove the below when v2 -> v1 backward 
compatibility is added
+      val e = intercept[IllegalStateException] {
+        new CommitLog(spark, 
testCommitLogV1FilePath.toString).deserialize(inputStream)
+      }
+
+      assert (e.getMessage.contains("only supported log version"))
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to