This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 98e94af58f16 [SPARK-48898][SQL] Fix Variant shredding bug
98e94af58f16 is described below

commit 98e94af58f1614e74b040057e71bd6402b85e67c
Author: cashmand <[email protected]>
AuthorDate: Tue Dec 3 23:08:22 2024 +0800

    [SPARK-48898][SQL] Fix Variant shredding bug
    
    ### What changes were proposed in this pull request?
    
    In VariantShreddingWriter, there are two calls to 
`variantBuilder.appendVariant` that were left over from an earlier version of 
the shredding spec where we constructed new metadata for every shredded value. 
This method rebuilds the Variant value to refer to the new metadata dictionary 
in the builder, so we should not be using it in shredding, where all dictionary 
IDs now refer to the original Variant metadata.
    
    1) When writing a Variant value that does not match the shredding type. The 
code was doing the right thing, but unnecessarily calling 
`variantBuilder.appendVariant` and then discarding the result. The PR removes 
that dead code.
    2) When reconstructing a Variant object that contains only the fields of 
the original object that don't appear in the shredding schema. This is a 
correctness bug, since we would modify the value to use new dictionary IDs that 
do not correspond to the ones in the original metadata.
    
    ### Why are the changes needed?
    
    Variant shredding correctness.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, shredding has not yet been released.
    
    ### How was this patch tested?
    
    Added a unit test that fails without the fix.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49031 from cashmand/SPARK-48898-bugfix.
    
    Authored-by: cashmand <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/types/variant/VariantShreddingWriter.java    |  6 +++---
 .../org/apache/spark/sql/VariantWriteShreddingSuite.scala     | 11 +++++++++++
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java
 
b/common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java
index b5f8ea0a1484..bbee7ee0dca3 100644
--- 
a/common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java
+++ 
b/common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java
@@ -101,7 +101,9 @@ public class VariantShreddingWriter {
           int id = v.getDictionaryIdAtIndex(i);
           fieldEntries.add(new VariantBuilder.FieldEntry(
               field.key, id, variantBuilder.getWritePos() - start));
-          variantBuilder.appendVariant(field.value);
+          // shallowAppendVariant is needed for correctness, since we're 
relying on the metadata IDs
+          // being unchanged.
+          variantBuilder.shallowAppendVariant(field.value);
         }
       }
       if (numFieldsMatched < objectSchema.length) {
@@ -133,8 +135,6 @@ public class VariantShreddingWriter {
         // Store the typed value.
         result.addScalar(typedValue);
       } else {
-        VariantBuilder variantBuilder = new VariantBuilder(false);
-        variantBuilder.appendVariant(v);
         result.addVariantValue(v.getValue());
       }
     } else {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala
index ed66ddb1f0f4..a62c6e446246 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala
@@ -179,6 +179,17 @@ class VariantWriteShreddingSuite extends SparkFunSuite 
with ExpressionEvalHelper
     // Not an object
     testWithSchema(obj, ArrayType(StructType.fromDDL("a int, b string")),
       Row(obj.getMetadata, untypedValue(obj), null))
+
+    // Similar to the case above where "b" was not in the shredding schema, 
but with the unshredded
+    // value being an object. Check that the copied value has correct 
dictionary IDs.
+    val obj2 = parseJson("""{"a": 1, "b": {"c": "hello"}}""")
+    val residual2 = untypedValue("""{"b": {"c": "hello"}}""")
+    // First byte is the type, second is number of fields, and the third is 
the ID for "b"
+    residual2(2) = 1
+    // Followed by 2 bytes for offsets, inner object type and number of 
fields, then ID for "c".
+    residual2(7) = 2
+    testWithSchema(obj2, StructType.fromDDL("a int, c string"),
+      Row(obj2.getMetadata, residual2, Row(Row(null, 1), Row(null, null))))
   }
 
   test("shredding as array") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to