This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c38844c9ecc6 [SPARK-49687][SQL] Delay sorting in 
`validateAndMaybeEvolveStateSchema`
c38844c9ecc6 is described below

commit c38844c9ecc6dd648500b2ef6ff01acbe46255f4
Author: Zhihong Yu <[email protected]>
AuthorDate: Tue Sep 17 10:58:05 2024 -0700

    [SPARK-49687][SQL] Delay sorting in `validateAndMaybeEvolveStateSchema`
    
    ### What changes were proposed in this pull request?
    In `validateAndMaybeEvolveStateSchema`, existing schema and new schema are 
sorted by column family name.
    The sorting can be delayed until `createSchemaFile` is called.
    When computing `colFamiliesAddedOrRemoved`, we can use `toSet` to compare 
column families.
    
    ### Why are the changes needed?
    This would make `validateAndMaybeEvolveStateSchema` faster.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #48116 from tedyu/ty-comp-chk.
    
    Authored-by: Zhihong Yu <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../streaming/state/StateSchemaCompatibilityChecker.scala      | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 3a1793f71794..721d72b6a099 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -168,12 +168,12 @@ class StateSchemaCompatibilityChecker(
       newStateSchema: List[StateStoreColFamilySchema],
       ignoreValueSchema: Boolean,
       stateSchemaVersion: Int): Boolean = {
-    val existingStateSchemaList = 
getExistingKeyAndValueSchema().sortBy(_.colFamilyName)
-    val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName)
+    val existingStateSchemaList = getExistingKeyAndValueSchema()
+    val newStateSchemaList = newStateSchema
 
     if (existingStateSchemaList.isEmpty) {
       // write the schema file if it doesn't exist
-      createSchemaFile(newStateSchemaList, stateSchemaVersion)
+      createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), 
stateSchemaVersion)
       true
     } else {
       // validate if the new schema is compatible with the existing schema
@@ -188,9 +188,9 @@ class StateSchemaCompatibilityChecker(
         }
       }
       val colFamiliesAddedOrRemoved =
-        newStateSchemaList.map(_.colFamilyName) != 
existingStateSchemaList.map(_.colFamilyName)
+        (newStateSchemaList.map(_.colFamilyName).toSet != 
existingSchemaMap.keySet)
       if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) 
{
-        createSchemaFile(newStateSchemaList, stateSchemaVersion)
+        createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), 
stateSchemaVersion)
       }
       // TODO: [SPARK-49535] Write Schema files after schema has changed for 
StateSchemaV3
       colFamiliesAddedOrRemoved


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to