[
https://issues.apache.org/jira/browse/SPARK-57499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qiegang Long updated SPARK-57499:
---------------------------------
Description:
There are two issues with variant extraction pushdown in DSV2.
h3. Issue 1: column pruning is skipped when variant pushdown is accepted
{{V2ScanRelationPushDown}} runs pushdown steps in order:
{code:java}
pushDownVariants // records extraction on ScanBuilderHolder
...
buildScanWithPushedVariants // calls builder.build(), replaces
ScanBuilderHolder
pruneColumns // matches ScanBuilderHolder only — no-op, holder
is gone
{code}
{*}builder.pruneColumns() is never called{*}. The scan reads the full table
schema, including unreferenced columns. This is most expensive for unreferenced
VARIANT columns — each is fully reconstructed from its shredded Parquet tree on
every row instead of being pruned.
This only affects the accepted pushdown path. When pushdown is declined or
disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
h3. Issue 2: invalid plan / crash on tables with ≥2 VARIANT columns
{{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the child
{{ScanBuilderHolder}} after returning the plan unchanged. When the bare
{{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects
unreferenced sibling VARIANT columns as full-variant requests and pushes them
to the builder. ParquetScanBuilder overwrites its state on every call, so the
second push clobbers the correct extraction from the first. The result is a
dangling ExprId in the projection and a *runtime crash:*
{code:java}
[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in [a#72,v1#73,v2#74]
{code}
The broken plan:
{code:java}
*(1) !Project [variant_get(v1#57, $.x) ...]
+- BatchScan parquet [a#66, v1#67, v2#68]
PushedVariantExtractions: [v2:"$":VariantType]
{code}
h3. Issue 3: wrong results / crash when a whole-variant read is shredded
A bare variant reference (a plain `SELECT v`, or a column lifted to feed a
variant_get
above a Join/Sort/Aggregate barrier the local rewrite cannot see) is recorded
as a
full-variant request (path "$"). Shredding it to a lone full-variant slot is
both
useless (the whole value is read regardless) and mishandled:
- The Parquet reader collapses a lone VariantType slot to a boolean
placeholder,
so ORDER BY variant_get(v, '$.price') sorts on the placeholder and silently
returns the wrong order, and max(variant_get(v, '$.price')) fails to codegen.
- A join key is re-exposed above the join as GetStructField(v_new, i) AS
v#orig;
RemoveRedundantAliases collapses the alias and the condition references a
dropped ExprId, failing plan validation.
h3. Reproduce (stock spark-4.1.2-bin-hadoop3)
Use a path-based view to force DSv2:
{code:java}
SET spark.sql.sources.useV1SourceList = "";
CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION '/tmp/vt';
INSERT INTO t VALUES
(1, parse_json('{"x":1,"price":3,"name":"x"}'), parse_json('{"y":2}')),
(2, parse_json('{"x":9,"price":1,"name":"z"}'), parse_json('{"y":8}'));
CREATE OR REPLACE TEMPORARY VIEW tv USING parquet OPTIONS (path '/tmp/vt');
CREATE OR REPLACE TEMPORARY VIEW codes USING parquet OPTIONS (path '/tmp/vt');
-- All four crash with: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find
v1#NN in [...]
SELECT variant_get(v1, '$.x', 'int') FROM tv;
SELECT variant_get(v1,'$.name','string') AS nm
FROM tv ORDER BY variant_get(v1,'$.price','int');
SELECT max(variant_get(v1, '$.price', 'int')) FROM tv;
SELECT l.a FROM tv l JOIN codes r
ON variant_get(l.v1,'$.x','int') = variant_get(r.v1,'$.x','int');
[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#21 in
[a#33,v1#34,v2#35]. SQLSTATE: XX000
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
...
{code}
was:
There are two issues with variant extraction pushdown in DSV2.
h3. Issue 1: column pruning is skipped when variant pushdown is accepted
{{V2ScanRelationPushDown}} runs pushdown steps in order:
{code:java}
pushDownVariants // records extraction on ScanBuilderHolder
...
buildScanWithPushedVariants // calls builder.build(), replaces
ScanBuilderHolder
pruneColumns // matches ScanBuilderHolder only — no-op, holder
is gone
{code}
{*}builder.pruneColumns() is never called{*}. The scan reads the full table
schema, including unreferenced columns. This is most expensive for unreferenced
VARIANT columns — each is fully reconstructed from its shredded Parquet tree on
every row instead of being pruned.
This only affects the accepted pushdown path. When pushdown is declined or
disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
h3. Issue 2 — invalid plan on native Parquet V2
{{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the child
{{ScanBuilderHolder}} after returning the plan unchanged. When the bare
{{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects
unreferenced sibling VARIANT columns as full-variant requests and pushes them
to the builder. ParquetScanBuilder overwrites its state on every call, so the
second push clobbers the correct extraction from the first. The result is a
dangling ExprId in the projection and a *runtime crash:*
{code:java}
[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in [a#72,v1#73,v2#74]
{code}
The broken plan:
{code:java}
*(1) !Project [variant_get(v1#57, $.x) ...]
+- BatchScan parquet [a#66, v1#67, v2#68]
PushedVariantExtractions: [v2:"$":VariantType]
{code}
h3. Issue 3 -- wrong results / crash when a whole-variant read is shredded
A bare variant reference (a plain `SELECT v`, or a column lifted to feed a
variant_get
above a Join/Sort/Aggregate barrier the local rewrite cannot see) is recorded
as a
full-variant request (path "$"). Shredding it to a lone full-variant slot is
both
useless (the whole value is read regardless) and mishandled:
- The Parquet reader collapses a lone VariantType slot to a boolean
placeholder,
so ORDER BY variant_get(v, '$.price') sorts on the placeholder and silently
returns the wrong order, and max(variant_get(v, '$.price')) fails to
codegen.
- A join key is re-exposed above the join as GetStructField(v_new, i) AS
v#orig;
RemoveRedundantAliases collapses the alias and the condition references a
dropped ExprId, failing plan validation.
h3. Reproduce (stock spark-4.1.2-bin-hadoop3)
Use a path-based view to force DSv2:
{code:java}
SET spark.sql.sources.useV1SourceList = "";
CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION '/tmp/vt';
INSERT INTO t VALUES
(1, parse_json('{"x":1,"price":3,"name":"x"}'), parse_json('{"y":2}')),
(2, parse_json('{"x":9,"price":1,"name":"z"}'), parse_json('{"y":8}'));
CREATE OR REPLACE TEMPORARY VIEW tv USING parquet OPTIONS (path '/tmp/vt');
CREATE OR REPLACE TEMPORARY VIEW codes USING parquet OPTIONS (path '/tmp/vt');
-- All four crash with: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find
v1#NN in [...]
SELECT variant_get(v1, '$.x', 'int') FROM tv;
SELECT variant_get(v1,'$.name','string') AS nm
FROM tv ORDER BY variant_get(v1,'$.price','int');
SELECT max(variant_get(v1, '$.price', 'int')) FROM tv;
SELECT l.a FROM tv l JOIN codes r
ON variant_get(l.v1,'$.x','int') = variant_get(r.v1,'$.x','int');
[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#21 in
[a#33,v1#34,v2#35]. SQLSTATE: XX000
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
...
{code}
> Fix column pruning and invalid plans in variant extraction pushdown on DSv2
> scans
> ---------------------------------------------------------------------------------
>
> Key: SPARK-57499
> URL: https://issues.apache.org/jira/browse/SPARK-57499
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.1.0, 4.1.2
> Reporter: Qiegang Long
> Priority: Major
> Labels: pull-request-available
>
> There are two issues with variant extraction pushdown in DSV2.
> h3. Issue 1: column pruning is skipped when variant pushdown is accepted
> {{V2ScanRelationPushDown}} runs pushdown steps in order:
> {code:java}
> pushDownVariants // records extraction on ScanBuilderHolder
> ...
> buildScanWithPushedVariants // calls builder.build(), replaces
> ScanBuilderHolder
> pruneColumns // matches ScanBuilderHolder only — no-op,
> holder is gone
> {code}
> {*}builder.pruneColumns() is never called{*}. The scan reads the full table
> schema, including unreferenced columns. This is most expensive for
> unreferenced VARIANT columns — each is fully reconstructed from its shredded
> Parquet tree on every row instead of being pruned.
> This only affects the accepted pushdown path. When pushdown is declined or
> disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
> h3. Issue 2: invalid plan / crash on tables with ≥2 VARIANT columns
> {{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the
> child {{ScanBuilderHolder}} after returning the plan unchanged. When the bare
> {{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects
> unreferenced sibling VARIANT columns as full-variant requests and pushes them
> to the builder. ParquetScanBuilder overwrites its state on every call, so the
> second push clobbers the correct extraction from the first. The result is a
> dangling ExprId in the projection and a *runtime crash:*
> {code:java}
> [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in
> [a#72,v1#73,v2#74]
> {code}
> The broken plan:
> {code:java}
> *(1) !Project [variant_get(v1#57, $.x) ...]
> +- BatchScan parquet [a#66, v1#67, v2#68]
> PushedVariantExtractions: [v2:"$":VariantType]
> {code}
> h3. Issue 3: wrong results / crash when a whole-variant read is shredded
> A bare variant reference (a plain `SELECT v`, or a column lifted to feed a
> variant_get
> above a Join/Sort/Aggregate barrier the local rewrite cannot see) is recorded
> as a
> full-variant request (path "$"). Shredding it to a lone full-variant slot is
> both
> useless (the whole value is read regardless) and mishandled:
> - The Parquet reader collapses a lone VariantType slot to a boolean
> placeholder,
> so ORDER BY variant_get(v, '$.price') sorts on the placeholder and silently
> returns the wrong order, and max(variant_get(v, '$.price')) fails to codegen.
> - A join key is re-exposed above the join as GetStructField(v_new, i) AS
> v#orig;
> RemoveRedundantAliases collapses the alias and the condition references a
> dropped ExprId, failing plan validation.
> h3. Reproduce (stock spark-4.1.2-bin-hadoop3)
> Use a path-based view to force DSv2:
> {code:java}
> SET spark.sql.sources.useV1SourceList = "";
> CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION
> '/tmp/vt';
> INSERT INTO t VALUES
> (1, parse_json('{"x":1,"price":3,"name":"x"}'), parse_json('{"y":2}')),
> (2, parse_json('{"x":9,"price":1,"name":"z"}'), parse_json('{"y":8}'));
> CREATE OR REPLACE TEMPORARY VIEW tv USING parquet OPTIONS (path '/tmp/vt');
> CREATE OR REPLACE TEMPORARY VIEW codes USING parquet OPTIONS (path '/tmp/vt');
> -- All four crash with: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find
> v1#NN in [...]
> SELECT variant_get(v1, '$.x', 'int') FROM tv;
> SELECT variant_get(v1,'$.name','string') AS nm
> FROM tv ORDER BY variant_get(v1,'$.price','int');
> SELECT max(variant_get(v1, '$.price', 'int')) FROM tv;
> SELECT l.a FROM tv l JOIN codes r
> ON variant_get(l.v1,'$.x','int') = variant_get(r.v1,'$.x','int');
> [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#21 in
> [a#33,v1#34,v2#35]. SQLSTATE: XX000
> at
> org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
> ...
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]