[ 
https://issues.apache.org/jira/browse/SPARK-57499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-57499:
-----------------------------------

    Assignee: Qiegang Long

> Fix column pruning and invalid plans in variant extraction pushdown on DSv2 
> scans
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-57499
>                 URL: https://issues.apache.org/jira/browse/SPARK-57499
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.1.0, 4.1.2
>            Reporter: Qiegang Long
>            Assignee: Qiegang Long
>            Priority: Major
>              Labels: pull-request-available
>
> There are two issues with variant extraction pushdown in DSV2.
> h3. Issue 1: column pruning is skipped when variant pushdown is accepted
> {{V2ScanRelationPushDown}} runs pushdown steps in order:
> {code:java}
> pushDownVariants             // records extraction on ScanBuilderHolder
> ...
> buildScanWithPushedVariants  // calls builder.build(), replaces 
> ScanBuilderHolder
> pruneColumns                 // matches ScanBuilderHolder only — no-op, 
> holder is gone
> {code}
> {*}builder.pruneColumns() is never called{*}. The scan reads the full table 
> schema, including unreferenced columns. This is most expensive for 
> unreferenced VARIANT columns — each is fully reconstructed from its shredded 
> Parquet tree on every row instead of being pruned.
> This only affects the accepted pushdown path. When pushdown is declined or 
> disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
> h3. Issue 2: invalid plan / crash on tables with ≥2 VARIANT columns
> {{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the 
> child {{ScanBuilderHolder}} after returning the plan unchanged. When the bare 
> {{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects 
> unreferenced sibling VARIANT columns as full-variant requests and pushes them 
> to the builder. ParquetScanBuilder overwrites its state on every call, so the 
> second push clobbers the correct extraction from the first. The result is a 
> dangling ExprId in the projection and a *runtime crash:*
> {code:java}
> [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in 
> [a#72,v1#73,v2#74]
> {code}
> The broken plan:
> {code:java}
> *(1) !Project [variant_get(v1#57, $.x) ...]        
> +- BatchScan parquet [a#66, v1#67, v2#68]           
>    PushedVariantExtractions: [v2:"$":VariantType]    
> {code}
> h3. Issue 3: wrong results / crash when a whole-variant read is shredded
> A bare variant reference (a plain `SELECT v`, or a column lifted to feed a 
> variant_get
> above a Join/Sort/Aggregate barrier the local rewrite cannot see) is recorded 
> as a
> full-variant request (path "$"). Shredding it to a lone full-variant slot is 
> both
> useless (the whole value is read regardless) and mishandled:
>  - The Parquet reader collapses a lone VariantType slot to a boolean 
> placeholder,
> so ORDER BY variant_get(v, '$.price') sorts on the placeholder and silently
> returns the wrong order, and max(variant_get(v, '$.price')) fails to codegen.
>  - A join key is re-exposed above the join as GetStructField(v_new, i) AS 
> v#orig;
> RemoveRedundantAliases collapses the alias and the condition references a
> dropped ExprId, failing plan validation.
> h3. Reproduce (stock spark-4.1.2-bin-hadoop3)
> Use a path-based view to force DSv2:
> {code:java}
> SET spark.sql.sources.useV1SourceList = "";
> CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION 
> '/tmp/vt';
> INSERT INTO t VALUES
>   (1, parse_json('{"x":1,"price":3,"name":"x"}'), parse_json('{"y":2}')),
>   (2, parse_json('{"x":9,"price":1,"name":"z"}'), parse_json('{"y":8}'));
> CREATE OR REPLACE TEMPORARY VIEW tv    USING parquet OPTIONS (path '/tmp/vt');
> CREATE OR REPLACE TEMPORARY VIEW codes USING parquet OPTIONS (path '/tmp/vt');
> -- All four crash with: [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find 
> v1#NN in [...]
> SELECT variant_get(v1, '$.x', 'int') FROM tv;        
> SELECT variant_get(v1,'$.name','string') AS nm
>   FROM tv ORDER BY variant_get(v1,'$.price','int');                   
> SELECT max(variant_get(v1, '$.price', 'int')) FROM tv;                     
> SELECT l.a FROM tv l JOIN codes r
>   ON variant_get(l.v1,'$.x','int') = variant_get(r.v1,'$.x','int');  
> [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#21 in 
> [a#33,v1#34,v2#35]. SQLSTATE: XX000
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.attributeNotFoundException(BoundAttribute.scala:109)
>   ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to