This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c38844c9ecc6 [SPARK-49687][SQL] Delay sorting in
`validateAndMaybeEvolveStateSchema`
c38844c9ecc6 is described below
commit c38844c9ecc6dd648500b2ef6ff01acbe46255f4
Author: Zhihong Yu <[email protected]>
AuthorDate: Tue Sep 17 10:58:05 2024 -0700
[SPARK-49687][SQL] Delay sorting in `validateAndMaybeEvolveStateSchema`
### What changes were proposed in this pull request?
In `validateAndMaybeEvolveStateSchema`, existing schema and new schema are
sorted by column family name.
The sorting can be delayed until `createSchemaFile` is called.
When computing `colFamiliesAddedOrRemoved`, we can use `toSet` to compare
column families.
### Why are the changes needed?
This would make `validateAndMaybeEvolveStateSchema` faster.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #48116 from tedyu/ty-comp-chk.
Authored-by: Zhihong Yu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../streaming/state/StateSchemaCompatibilityChecker.scala | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 3a1793f71794..721d72b6a099 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -168,12 +168,12 @@ class StateSchemaCompatibilityChecker(
newStateSchema: List[StateStoreColFamilySchema],
ignoreValueSchema: Boolean,
stateSchemaVersion: Int): Boolean = {
- val existingStateSchemaList =
getExistingKeyAndValueSchema().sortBy(_.colFamilyName)
- val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName)
+ val existingStateSchemaList = getExistingKeyAndValueSchema()
+ val newStateSchemaList = newStateSchema
if (existingStateSchemaList.isEmpty) {
// write the schema file if it doesn't exist
- createSchemaFile(newStateSchemaList, stateSchemaVersion)
+ createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName),
stateSchemaVersion)
true
} else {
// validate if the new schema is compatible with the existing schema
@@ -188,9 +188,9 @@ class StateSchemaCompatibilityChecker(
}
}
val colFamiliesAddedOrRemoved =
- newStateSchemaList.map(_.colFamilyName) !=
existingStateSchemaList.map(_.colFamilyName)
+ (newStateSchemaList.map(_.colFamilyName).toSet !=
existingSchemaMap.keySet)
if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved)
{
- createSchemaFile(newStateSchemaList, stateSchemaVersion)
+ createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName),
stateSchemaVersion)
}
// TODO: [SPARK-49535] Write Schema files after schema has changed for
StateSchemaV3
colFamiliesAddedOrRemoved
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]