This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 9fbc3dc76f97 [SPARK-50652][SS] Add checks to RocksDB V2 backward
compatibility
9fbc3dc76f97 is described below
commit 9fbc3dc76f97b04af8b56de171662e08f382b321
Author: Wei Liu <[email protected]>
AuthorDate: Sat Jan 18 06:44:21 2025 +0900
[SPARK-50652][SS] Add checks to RocksDB V2 backward compatibility
### What changes were proposed in this pull request?
Currently in rocksDB level, v2 -> v1 backward compatibility is not
supported. This PR adds a exact version check in commit log level for this
purpose.
### Why are the changes needed?
For compatibility check
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modify test cases and filed followups
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49275 from WweiL/commit-log-version-followup.
Authored-by: Wei Liu <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 205e382d56434ef156e67382b8925c70ad77c174)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 24 +++++++++
.../scala/org/apache/spark/SparkException.scala | 19 +++++++
.../spark/sql/errors/QueryExecutionErrors.scala | 28 ++++++++++
.../spark/sql/execution/streaming/CommitLog.scala | 4 +-
.../sql/execution/streaming/HDFSMetadataLog.scala | 3 ++
.../execution/streaming/MetadataVersionUtil.scala | 62 ++++++++++++++--------
.../spark/sql/streaming/CommitLogSuite.scala | 27 +++++++---
7 files changed, 135 insertions(+), 32 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index d949ece67903..2506c20453ae 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -2870,6 +2870,24 @@
},
"sqlState" : "42K0E"
},
+ "INVALID_LOG_VERSION" : {
+ "message" : [
+ "UnsupportedLogVersion."
+ ],
+ "subClass" : {
+ "EXACT_MATCH_VERSION" : {
+ "message" : [
+ "The only supported log version is v<matchVersion>, but encountered
v<version>."
+ ]
+ },
+ "MAX_SUPPORTED_VERSION" : {
+ "message" : [
+ "The maximum supported log version is v<maxSupportedVersion>, but
encountered v<version>. The log file was produced by a newer version of Spark
and cannot be read by this version. You need to upgrade."
+ ]
+ }
+ },
+ "sqlState" : "KD002"
+ },
"INVALID_NON_DETERMINISTIC_EXPRESSIONS" : {
"message" : [
"The operator expects a deterministic expression, but the actual
expression is <sqlExprs>."
@@ -3627,6 +3645,12 @@
],
"sqlState" : "KD000"
},
+ "MALFORMED_LOG_FILE" : {
+ "message" : [
+ "Log file was malformed: failed to read correct log version from <text>."
+ ],
+ "sqlState" : "KD002"
+ },
"MALFORMED_PROTOBUF_MESSAGE" : {
"message" : [
"Malformed Protobuf messages are detected in message deserialization.
Parse Mode: <failFastMode>. To process malformed protobuf message as null
result, try setting the option 'mode' as 'PERMISSIVE'."
diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 0c0a1902ee2a..00989fd29095 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -452,6 +452,25 @@ private[spark] class SparkIllegalArgumentException private(
override def getQueryContext: Array[QueryContext] = context
}
+/**
+ * IllegalStateException thrown from Spark with an error class.
+ */
+private[spark] class SparkIllegalStateException(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ context: Array[QueryContext] = Array.empty,
+ cause: Throwable = null)
+ extends IllegalStateException(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters), cause)
+ with SparkThrowable {
+
+ override def getMessageParameters: java.util.Map[String, String] =
messageParameters.asJava
+
+ override def getCondition: String = errorClass
+
+ override def getQueryContext: Array[QueryContext] = context
+}
+
private[spark] class SparkRuntimeException private(
message: String,
cause: Option[Throwable],
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 1ae2e5445c0c..69518b548653 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2631,6 +2631,34 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
cause = null)
}
+ def malformedLogFile(text: String): Throwable = {
+ new SparkIllegalStateException(
+ errorClass = "MALFORMED_LOG_FILE",
+ messageParameters = Map("text" -> text),
+ cause = null
+ )
+ }
+
+ def logVersionGreaterThanSupported(version: Int, maxSupportedVersion: Int):
Throwable = {
+ new SparkIllegalStateException(
+ errorClass = "INVALID_LOG_VERSION.MAX_SUPPORTED_VERSION",
+ messageParameters = Map(
+ "version" -> version.toString,
+ "maxSupportedVersion" -> maxSupportedVersion.toString),
+ cause = null
+ )
+ }
+
+ def logVersionNotMatch(version: Int, matchVersion: Int): Throwable = {
+ new SparkIllegalStateException(
+ errorClass = "INVALID_LOG_VERSION.EXACT_MATCH_VERSION",
+ messageParameters = Map(
+ "version" -> version.toString,
+ "matchVersion" -> matchVersion.toString),
+ cause = null
+ )
+ }
+
def invalidChangeLogReaderVersion(version: Long): Throwable = {
new SparkException(
errorClass = "CANNOT_LOAD_STATE_STORE.INVALID_CHANGE_LOG_READER_VERSION",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
index a7fcaa5ccbcb..f501b62b8a71 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
@@ -61,7 +61,9 @@ class CommitLog(sparkSession: SparkSession, path: String)
throw new IllegalStateException("Incomplete log file in the offset
commit log")
}
// TODO [SPARK-49462] This validation should be relaxed for a stateless
query.
- validateVersion(lines.next().trim, VERSION)
+ // TODO [SPARK-50653] This validation should be relaxed to support reading
+ // a V1 log file when VERSION is V2
+ validateVersionExactMatch(lines.next().trim, VERSION)
val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON
CommitMetadata(metadataJson)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 251cc16acdf4..423648f44590 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -357,6 +357,9 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession:
SparkSession, path:
private[sql] def validateVersion(text: String, maxSupportedVersion: Int):
Int =
MetadataVersionUtil.validateVersion(text, maxSupportedVersion)
+
+ private[sql] def validateVersionExactMatch(text: String, matchVersion: Int):
Int =
+ MetadataVersionUtil.validateVersionExactMatch(text, matchVersion)
}
object HDFSMetadataLog {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala
index 548f2aa5d5c5..854e36b5304a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala
@@ -17,35 +17,51 @@
package org.apache.spark.sql.execution.streaming
+import org.apache.spark.sql.errors.QueryExecutionErrors
+
object MetadataVersionUtil {
/**
* Parse the log version from the given `text` -- will throw exception when
the parsed version
- * exceeds `maxSupportedVersion`, or when `text` is malformed (such as
"xyz", "v", "v-1",
- * "v123xyz" etc.)
+ * exceeds `maxSupportedVersion`, or when `text` is malformed.
*/
def validateVersion(text: String, maxSupportedVersion: Int): Int = {
- if (text.length > 0 && text(0) == 'v') {
- val version =
- try {
- text.substring(1, text.length).toInt
- } catch {
- case _: NumberFormatException =>
- throw new IllegalStateException(s"Log file was malformed: failed
to read correct log " +
- s"version from $text.")
- }
- if (version > 0) {
- if (version > maxSupportedVersion) {
- throw new IllegalStateException(s"UnsupportedLogVersion: maximum
supported log version " +
- s"is v${maxSupportedVersion}, but encountered v$version. The log
file was produced " +
- s"by a newer version of Spark and cannot be read by this version.
Please upgrade.")
- } else {
- return version
- }
- }
+ val version: Int = extractVersion(text)
+ if (version > maxSupportedVersion) {
+ throw QueryExecutionErrors.logVersionGreaterThanSupported(version,
maxSupportedVersion)
}
+ version
+ }
- // reaching here means we failed to read the correct log version
- throw new IllegalStateException(s"Log file was malformed: failed to read
correct log " +
- s"version from $text.")
+ /**
+ * Parse the log version from the given `text` -- will throw exception when
the parsed version
+ * does not equal to `matchVersion`, or when `text` is malformed.
+ */
+ def validateVersionExactMatch(text: String, matchVersion: Int): Int = {
+ val version: Int = extractVersion(text)
+ if (version != matchVersion) {
+ throw QueryExecutionErrors.logVersionNotMatch(version, matchVersion)
+ }
+ version
+ }
+
+ /**
+ * Parse the log version from the given `text` -- will throw exception when
the parsed version
+ * when `text` is malformed (such as "xyz", "v", "v-1", "v123xyz" etc.)
+ */
+ private def extractVersion(text: String): Int = {
+ val version: Int = if (text.nonEmpty && text(0) == 'v') {
+ try {
+ text.substring(1, text.length).toInt
+ } catch {
+ case _: NumberFormatException =>
+ throw QueryExecutionErrors.malformedLogFile(text)
+ }
+ } else {
+ throw QueryExecutionErrors.malformedLogFile(text)
+ }
+ if (version <= 0) {
+ throw QueryExecutionErrors.malformedLogFile(text)
+ }
+ version
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala
index ea2f4154170a..1af35d0a1db4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala
@@ -117,13 +117,24 @@ class CommitLogSuite extends SparkFunSuite with
SharedSparkSession {
// Old metadata structure with no state unique ids should not affect the
deserialization
test("Cross-version V1 SerDe") {
- val commitlogV1 = """v1
- |{"nextBatchWatermarkMs":233}""".stripMargin
- val inputStream: ByteArrayInputStream =
- new ByteArrayInputStream(commitlogV1.getBytes("UTF-8"))
- val commitMetadata: CommitMetadata = new CommitLog(
- spark, testCommitLogV1FilePath.toString).deserialize(inputStream)
- assert(commitMetadata.nextBatchWatermarkMs === 233)
- assert(commitMetadata.stateUniqueIds === None)
+ withSQLConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") {
+ val commitlogV1 = """v1
+ |{"nextBatchWatermarkMs":233}""".stripMargin
+ val inputStream: ByteArrayInputStream =
+ new ByteArrayInputStream(commitlogV1.getBytes("UTF-8"))
+
+ // TODO [SPARK-50653]: Uncomment the below when v2 -> v1 backward
compatibility is added
+ // val commitMetadata: CommitMetadata = new CommitLog(
+ // spark, testCommitLogV1FilePath.toString).deserialize(inputStream)
+ // assert(commitMetadata.nextBatchWatermarkMs === 233)
+ // assert(commitMetadata.stateUniqueIds === Map.empty)
+
+ // TODO [SPARK-50653]: remove the below when v2 -> v1 backward
compatibility is added
+ val e = intercept[IllegalStateException] {
+ new CommitLog(spark,
testCommitLogV1FilePath.toString).deserialize(inputStream)
+ }
+
+ assert (e.getMessage.contains("only supported log version"))
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]