This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 42cd961fa97d [SPARK-48587][VARIANT] Avoid storage amplification when
accessing a sub-Variant
42cd961fa97d is described below
commit 42cd961fa97d8a248fd12ec00d248b69b4db145f
Author: cashmand <[email protected]>
AuthorDate: Mon Jun 17 13:26:42 2024 +0800
[SPARK-48587][VARIANT] Avoid storage amplification when accessing a
sub-Variant
### What changes were proposed in this pull request?
Previously, if `variant_get` returned a Variant or a nested type containing
Variant, we would just return the slice of the value along with the full
metadata. Typically, most of the metadata is relevant to other parts of the
original Variant value, and is not needed, so the resulting binary is larger
than needed. This can be very expensive if the value is written to disk (e.g.
parquet or shuffle file).
### Why are the changes needed?
Avoid unnecessarily large Variant values in memory and on disk.
### Does this PR introduce _any_ user-facing change?
No, the resulting Variant is logically the same as before, only storage
size should change.
### How was this patch tested?
Unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #46941 from cashmand/SPARK-48587.
Authored-by: cashmand <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../expressions/variant/variantExpressions.scala | 8 +++-
.../scala/org/apache/spark/sql/VariantSuite.scala | 43 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
index 26ab2f08b644..b80fb11b6813 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala
@@ -322,7 +322,13 @@ case object VariantGet {
}
}
- if (dataType == VariantType) return new VariantVal(v.getValue,
v.getMetadata)
+ if (dataType == VariantType) {
+ // Build a new variant, in order to strip off any unnecessary metadata.
+ val builder = new VariantBuilder
+ builder.appendVariant(v)
+ val result = builder.result()
+ return new VariantVal(result.getValue, result.getMetadata)
+ }
val variantType = v.getType
if (variantType == Type.NULL) return null
dataType match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
index caab98b6239a..0c00676607dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
@@ -584,4 +584,47 @@ class VariantSuite extends QueryTest with
SharedSparkSession with ExpressionEval
checkEvaluation(castVariantExpr, sqlVariantExpr.eval())
}
}
+
+ test("variant_get size") {
+ val largeKey = "x" * 1000
+ val df = Seq(s"""{ "$largeKey": {"a" : 1 },
+ "b" : 2,
+ "c": [1,2,3,{"$largeKey": 4}] }""").toDF("json")
+ .selectExpr("parse_json(json) as v")
+
+ // Check Variant with approximate bounds to avoid flakiness if we make
minor format changes.
+ def checkSize(v: VariantVal, minMetadata: Long, maxMetadata: Long,
+ minValue: Long, maxValue: Long): Unit = {
+ val mSize = v.getMetadata.length
+ assert(mSize >= minMetadata)
+ assert(mSize <= maxMetadata)
+ val vSize = v.getValue.length
+ assert(vSize >= minValue)
+ assert(vSize <= maxValue)
+ }
+
+ // The full Variant has large metadata (but only one copy of `largeKey`).
+ checkSize(df.selectExpr("variant_get(v, '$', 'variant')").collect()(0)
+ .getAs[VariantVal](0), 1000, 1050, 20, 40)
+ // Extracting Variant or a nested type containing Variant should strip out
the large metadata.
+ checkSize(df.selectExpr("variant_get(v, '$.b', 'variant')").collect()(0)
+ .getAs[VariantVal](0), 2, 4, 2, 4)
+ // Behavior is the same without an explicit cast to Variant.
+ checkSize(df.selectExpr("variant_get(v, '$.b', 'variant')").collect()(0)
+ .getAs[VariantVal](0), 2, 4, 2, 4)
+ checkSize(df.selectExpr(s"variant_get(v, '$$.$largeKey',
'variant')").collect()(0)
+ .getAs[VariantVal](0), 5, 10, 5, 10)
+ checkSize(df.selectExpr(s"variant_get(v, '$$.$largeKey',
'struct<a:variant>')")
+ .collect()(0).getStruct(0).getAs[VariantVal](0), 2, 4, 2, 4)
+ // Only the array element that contains `largeKey` should be large.
+ checkSize(df.selectExpr("variant_get(v, '$.c',
'array<variant>')").collect()(0)
+ .getSeq[VariantVal](0)(0), 2, 4, 2, 4)
+ checkSize(df.selectExpr("variant_get(v, '$.c',
'array<variant>')").collect()(0)
+ .getSeq[VariantVal](0)(3), 1000, 1020, 5, 10)
+ // Cast to a nested type containing Variant should also remove metadata.
+ val structResult = df.selectExpr(s"cast(v as
struct<$largeKey:variant,b:variant>)").collect()(0)
+ .getStruct(0)
+ checkSize(structResult.getAs[VariantVal](0), 5, 10, 5, 10)
+ checkSize(structResult.getAs[VariantVal](1), 2, 4, 2, 4)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]