This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 42cd961fa97d [SPARK-48587][VARIANT] Avoid storage amplification when 
accessing a sub-Variant
42cd961fa97d is described below

commit 42cd961fa97d8a248fd12ec00d248b69b4db145f
Author: cashmand <[email protected]>
AuthorDate: Mon Jun 17 13:26:42 2024 +0800

    [SPARK-48587][VARIANT] Avoid storage amplification when accessing a 
sub-Variant
    
    ### What changes were proposed in this pull request?
    
    Previously, if `variant_get` returned a Variant or a nested type containing 
Variant, we would just return the slice of the value along with the full 
metadata. Typically, most of the metadata is relevant to other parts of the 
original Variant value, and is not needed, so the resulting binary is larger 
than needed. This can be very expensive if the value is written to disk (e.g. 
parquet or shuffle file).
    
    ### Why are the changes needed?
    
    Avoid unnecessarily large Variant values in memory and on disk.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the resulting Variant is logically the same as before, only storage 
size should change.
    
    ### How was this patch tested?
    
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46941 from cashmand/SPARK-48587.
    
    Authored-by: cashmand <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../expressions/variant/variantExpressions.scala   |  8 +++-
 .../scala/org/apache/spark/sql/VariantSuite.scala  | 43 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
index 26ab2f08b644..b80fb11b6813 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
@@ -322,7 +322,13 @@ case object VariantGet {
       }
     }
 
-    if (dataType == VariantType) return new VariantVal(v.getValue, 
v.getMetadata)
+    if (dataType == VariantType) {
+      // Build a new variant, in order to strip off any unnecessary metadata.
+      val builder = new VariantBuilder
+      builder.appendVariant(v)
+      val result = builder.result()
+      return new VariantVal(result.getValue, result.getMetadata)
+    }
     val variantType = v.getType
     if (variantType == Type.NULL) return null
     dataType match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
index caab98b6239a..0c00676607dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
@@ -584,4 +584,47 @@ class VariantSuite extends QueryTest with 
SharedSparkSession with ExpressionEval
       checkEvaluation(castVariantExpr, sqlVariantExpr.eval())
     }
   }
+
+  test("variant_get size") {
+    val largeKey = "x" * 1000
+    val df = Seq(s"""{ "$largeKey": {"a" : 1 },
+                       "b" : 2,
+                       "c": [1,2,3,{"$largeKey": 4}] }""").toDF("json")
+            .selectExpr("parse_json(json) as v")
+
+    // Check Variant with approximate bounds to avoid flakiness if we make 
minor format changes.
+    def checkSize(v: VariantVal, minMetadata: Long, maxMetadata: Long,
+                  minValue: Long, maxValue: Long): Unit = {
+      val mSize = v.getMetadata.length
+      assert(mSize >= minMetadata)
+      assert(mSize <= maxMetadata)
+      val vSize = v.getValue.length
+      assert(vSize >= minValue)
+      assert(vSize <= maxValue)
+    }
+
+    // The full Variant has large metadata (but only one copy of `largeKey`).
+    checkSize(df.selectExpr("variant_get(v, '$', 'variant')").collect()(0)
+        .getAs[VariantVal](0), 1000, 1050, 20, 40)
+    // Extracting Variant or a nested type containing Variant should strip out 
the large metadata.
+    checkSize(df.selectExpr("variant_get(v, '$.b', 'variant')").collect()(0)
+        .getAs[VariantVal](0), 2, 4, 2, 4)
+    // Behavior is the same without an explicit cast to Variant.
+    checkSize(df.selectExpr("variant_get(v, '$.b', 'variant')").collect()(0)
+        .getAs[VariantVal](0), 2, 4, 2, 4)
+    checkSize(df.selectExpr(s"variant_get(v, '$$.$largeKey', 
'variant')").collect()(0)
+        .getAs[VariantVal](0), 5, 10, 5, 10)
+    checkSize(df.selectExpr(s"variant_get(v, '$$.$largeKey', 
'struct<a:variant>')")
+        .collect()(0).getStruct(0).getAs[VariantVal](0), 2, 4, 2, 4)
+    // Only the array element that contains `largeKey` should be large.
+    checkSize(df.selectExpr("variant_get(v, '$.c', 
'array<variant>')").collect()(0)
+        .getSeq[VariantVal](0)(0), 2, 4, 2, 4)
+    checkSize(df.selectExpr("variant_get(v, '$.c', 
'array<variant>')").collect()(0)
+        .getSeq[VariantVal](0)(3), 1000, 1020, 5, 10)
+    // Cast to a nested type containing Variant should also remove metadata.
+    val structResult = df.selectExpr(s"cast(v as 
struct<$largeKey:variant,b:variant>)").collect()(0)
+        .getStruct(0)
+    checkSize(structResult.getAs[VariantVal](0), 5, 10, 5, 10)
+    checkSize(structResult.getAs[VariantVal](1), 2, 4, 2, 4)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to