[ 
https://issues.apache.org/jira/browse/SPARK-57499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qiegang Long updated SPARK-57499:
---------------------------------
    Description: 
There are two issues with variant extraction pushdown in DSV2.
h3. Issue 1: column pruning is skipped when variant pushdown is accepted

{{V2ScanRelationPushDown}} runs pushdown steps in order:
{code:java}
pushDownVariants             // records extraction on ScanBuilderHolder
...
buildScanWithPushedVariants  // calls builder.build(), replaces 
ScanBuilderHolder
pruneColumns                 // matches ScanBuilderHolder only — no-op, holder 
is gone
{code}
{*}builder.pruneColumns() is never called{*}. The scan reads the full table 
schema, including unreferenced columns. This is most expensive for unreferenced 
VARIANT columns — each is fully reconstructed from its shredded Parquet tree on 
every row instead of being pruned.

This only affects the accepted pushdown path. When pushdown is declined or 
disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
h3. Issue 2 — invalid plan on native Parquet V2

{{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the child 
{{ScanBuilderHolder}} after returning the plan unchanged. When the bare 
{{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects 
unreferenced sibling VARIANT columns as full-variant requests and pushes them 
to the builder. ParquetScanBuilder overwrites its state on every call, so the 
second push clobbers the correct extraction from the first. The result is a 
dangling ExprId in the projection and a *runtime crash:*
{code:java}
[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in [a#72,v1#73,v2#74]
{code}
The broken plan:
{code:java}
*(1) !Project [variant_get(v1#57, $.x) ...]        
+- BatchScan parquet [a#66, v1#67, v2#68]           
   PushedVariantExtractions: [v2:"$":VariantType]    
{code}

h3. Issue 3 -- wrong results / crash when a whole-variant read is shredded

A bare variant reference (a plain `SELECT v`, or a column lifted to feed a 
variant_get
above a Join/Sort/Aggregate barrier the local rewrite cannot see) is recorded 
as a
full-variant request (path "$"). Shredding it to a lone full-variant slot is 
both
useless (the whole value is read regardless) and mishandled:

  - The Parquet reader collapses a lone VariantType slot to a boolean 
placeholder,
    so ORDER BY variant_get(v, '$.price') sorts on the placeholder and silently
    returns the wrong order, and max(variant_get(v, '$.price')) fails to 
codegen.
  - A join key is re-exposed above the join as GetStructField(v_new, i) AS 
v#orig;
    RemoveRedundantAliases collapses the alias and the condition references a
    dropped ExprId, failing plan validation.

h3. Reproduce (stock spark-4.1.2-bin-hadoop3)

Use a path-based view to force DSv2:
{code:java}
SET spark.sql.sources.useV1SourceList = "";

CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION '/tmp/vt';
INSERT INTO t VALUES
  (1, parse_json('{"x":1,"price":3,"name":"x"}'), parse_json('{"y":2}')),
  (2, parse_json('{"x":9,"price":1,"name":"z"}'), parse_json('{"y":8}'));
CREATE OR REPLACE TEMPORARY VIEW tv    USING parquet OPTIONS (path '/tmp/vt');
CREATE OR REPLACE TEMPORARY VIEW codes USING parquet OPTIONS (path '/tmp/vt');

-- All four crash with: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find 
v1#NN in [...]
SELECT variant_get(v1, '$.x', 'int') FROM tv;        
SELECT variant_get(v1,'$.name','string') AS nm
  FROM tv ORDER BY variant_get(v1,'$.price','int');                   
SELECT max(variant_get(v1, '$.price', 'int')) FROM tv;                     
SELECT l.a FROM tv l JOIN codes r
  ON variant_get(l.v1,'$.x','int') = variant_get(r.v1,'$.x','int');  

[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#21 in 
[a#33,v1#34,v2#35]. SQLSTATE: XX000
  at 
org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
  ...
{code}


  was:
There are two issues with variant extraction pushdown in DSV2.
h3. Issue 1: column pruning is skipped when variant pushdown is accepted

{{V2ScanRelationPushDown}} runs pushdown steps in order:
{code:java}
pushDownVariants             // records extraction on ScanBuilderHolder
...
buildScanWithPushedVariants  // calls builder.build(), replaces 
ScanBuilderHolder
pruneColumns                 // matches ScanBuilderHolder only — no-op, holder 
is gone
{code}
{*}builder.pruneColumns() is never called{*}. The scan reads the full table 
schema, including unreferenced columns. This is most expensive for unreferenced 
VARIANT columns — each is fully reconstructed from its shredded Parquet tree on 
every row instead of being pruned.

This only affects the accepted pushdown path. When pushdown is declined or 
disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
h3. Issue 2 — invalid plan on native Parquet V2

{{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the child 
{{ScanBuilderHolder}} after returning the plan unchanged. When the bare 
{{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects 
unreferenced sibling VARIANT columns as full-variant requests and pushes them 
to the builder. ParquetScanBuilder overwrites its state on every call, so the 
second push clobbers the correct extraction from the first. The result is a 
dangling ExprId in the projection and a *runtime crash:*
{code:java}
[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in [a#72,v1#73,v2#74]
{code}
The broken plan:
{code:java}
*(1) !Project [variant_get(v1#57, $.x) ...]        
+- BatchScan parquet [a#66, v1#67, v2#68]           
   PushedVariantExtractions: [v2:"$":VariantType]    
{code}

h3. Issue 3 -- wrong results / crash when a whole-variant read is shredded

A bare variant reference (a plain `SELECT v`, or a column lifted to feed a 
variant_get
above a Join/Sort/Aggregate barrier the local rewrite cannot see) is recorded 
as a
full-variant request (path "$"). Shredding it to a lone full-variant slot is 
both
useless (the whole value is read regardless) and mishandled:

  - The Parquet reader collapses a lone VariantType slot to a boolean 
placeholder,
    so ORDER BY variant_get(v, '$.price') sorts on the placeholder and silently
    returns the wrong order, and max(variant_get(v, '$.price')) fails to 
codegen.
  - A join key is re-exposed above the join as GetStructField(v_new, i) AS 
v#orig;
    RemoveRedundantAliases collapses the alias and the condition references a
    dropped ExprId, failing plan validation.

h3. Reproduce (stock spark-4.1.2-bin-hadoop3)

Use a path-based view to force DSv2:
{code:java}
SET spark.sql.sources.useV1SourceList = "";
CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION '/tmp/vt';
INSERT INTO t VALUES (1, parse_json('{"x":1}'), parse_json('{"y":2}'));
CREATE OR REPLACE TEMPORARY VIEW tv USING parquet OPTIONS (path '/tmp/vt');

SELECT variant_get(v1, '$.x', 'int') FROM tv;
{code}
*select crashes.*


> Fix column pruning and invalid plans in variant extraction pushdown on DSv2 
> scans
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-57499
>                 URL: https://issues.apache.org/jira/browse/SPARK-57499
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.1.0, 4.1.2
>            Reporter: Qiegang Long
>            Priority: Major
>              Labels: pull-request-available
>
> There are two issues with variant extraction pushdown in DSV2.
> h3. Issue 1: column pruning is skipped when variant pushdown is accepted
> {{V2ScanRelationPushDown}} runs pushdown steps in order:
> {code:java}
> pushDownVariants             // records extraction on ScanBuilderHolder
> ...
> buildScanWithPushedVariants  // calls builder.build(), replaces 
> ScanBuilderHolder
> pruneColumns                 // matches ScanBuilderHolder only — no-op, 
> holder is gone
> {code}
> {*}builder.pruneColumns() is never called{*}. The scan reads the full table 
> schema, including unreferenced columns. This is most expensive for 
> unreferenced VARIANT columns — each is fully reconstructed from its shredded 
> Parquet tree on every row instead of being pruned.
> This only affects the accepted pushdown path. When pushdown is declined or 
> disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
> h3. Issue 2 — invalid plan on native Parquet V2
> {{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the 
> child {{ScanBuilderHolder}} after returning the plan unchanged. When the bare 
> {{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects 
> unreferenced sibling VARIANT columns as full-variant requests and pushes them 
> to the builder. ParquetScanBuilder overwrites its state on every call, so the 
> second push clobbers the correct extraction from the first. The result is a 
> dangling ExprId in the projection and a *runtime crash:*
> {code:java}
> [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in 
> [a#72,v1#73,v2#74]
> {code}
> The broken plan:
> {code:java}
> *(1) !Project [variant_get(v1#57, $.x) ...]        
> +- BatchScan parquet [a#66, v1#67, v2#68]           
>    PushedVariantExtractions: [v2:"$":VariantType]    
> {code}
> h3. Issue 3 -- wrong results / crash when a whole-variant read is shredded
> A bare variant reference (a plain `SELECT v`, or a column lifted to feed a 
> variant_get
> above a Join/Sort/Aggregate barrier the local rewrite cannot see) is recorded 
> as a
> full-variant request (path "$"). Shredding it to a lone full-variant slot is 
> both
> useless (the whole value is read regardless) and mishandled:
>   - The Parquet reader collapses a lone VariantType slot to a boolean 
> placeholder,
>     so ORDER BY variant_get(v, '$.price') sorts on the placeholder and 
> silently
>     returns the wrong order, and max(variant_get(v, '$.price')) fails to 
> codegen.
>   - A join key is re-exposed above the join as GetStructField(v_new, i) AS 
> v#orig;
>     RemoveRedundantAliases collapses the alias and the condition references a
>     dropped ExprId, failing plan validation.
> h3. Reproduce (stock spark-4.1.2-bin-hadoop3)
> Use a path-based view to force DSv2:
> {code:java}
> SET spark.sql.sources.useV1SourceList = "";
> CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION 
> '/tmp/vt';
> INSERT INTO t VALUES
>   (1, parse_json('{"x":1,"price":3,"name":"x"}'), parse_json('{"y":2}')),
>   (2, parse_json('{"x":9,"price":1,"name":"z"}'), parse_json('{"y":8}'));
> CREATE OR REPLACE TEMPORARY VIEW tv    USING parquet OPTIONS (path '/tmp/vt');
> CREATE OR REPLACE TEMPORARY VIEW codes USING parquet OPTIONS (path '/tmp/vt');
> -- All four crash with: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find 
> v1#NN in [...]
> SELECT variant_get(v1, '$.x', 'int') FROM tv;        
> SELECT variant_get(v1,'$.name','string') AS nm
>   FROM tv ORDER BY variant_get(v1,'$.price','int');                   
> SELECT max(variant_get(v1, '$.price', 'int')) FROM tv;                     
> SELECT l.a FROM tv l JOIN codes r
>   ON variant_get(l.v1,'$.x','int') = variant_get(r.v1,'$.x','int');  
> [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#21 in 
> [a#33,v1#34,v2#35]. SQLSTATE: XX000
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
>   ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to