[
https://issues.apache.org/jira/browse/SPARK-57499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qiegang Long updated SPARK-57499:
---------------------------------
Description:
There are two issues with variant extraction pushdown in DSV2.
h3. Issue 1: column pruning is skipped when variant pushdown is accepted
{{V2ScanRelationPushDown}} runs pushdown steps in order:
{code:java}
pushDownVariants // records extraction on ScanBuilderHolder
...
buildScanWithPushedVariants // calls builder.build(), replaces
ScanBuilderHolder
pruneColumns // matches ScanBuilderHolder only — no-op, holder
is gone
{code}
{*}builder.pruneColumns() is never called{*}. The scan reads the full table
schema, including unreferenced columns. This is most expensive for unreferenced
VARIANT columns — each is fully reconstructed from its shredded Parquet tree on
every row instead of being pruned.
This only affects the accepted pushdown path. When pushdown is declined or
disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
h3. Issue 2 — invalid plan on native Parquet V2
{{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the child
{{ScanBuilderHolder}} after returning the plan unchanged. When the bare
{{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects
unreferenced sibling VARIANT columns as full-variant requests and pushes them
to the builder. ParquetScanBuilder overwrites its state on every call, so the
second push clobbers the correct extraction from the first. The result is a
dangling ExprId in the projection and a *runtime crash:*
{code:java}
[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in [a#72,v1#73,v2#74]
{code}
The broken plan:
{code:java}
*(1) !Project [variant_get(v1#57, $.x) ...]
+- BatchScan parquet [a#66, v1#67, v2#68]
PushedVariantExtractions: [v2:"$":VariantType]
{code}
h3. Reproduce (stock spark-4.1.2-bin-hadoop3)
Use a path-based view to force DSv2:
{code:java}
SET spark.sql.sources.useV1SourceList = "";
CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION '/tmp/vt';
INSERT INTO t VALUES (1, parse_json('{"x":1}'), parse_json('{"y":2}'));
CREATE OR REPLACE TEMPORARY VIEW tv USING parquet OPTIONS (path '/tmp/vt');
SELECT variant_get(v1, '$.x', 'int') FROM tv;
{code}
*select crashes.*
was:
There are two issues with variant extraction pushdown in DSV2.
h3. Issue 1: column pruning is skipped when variant pushdown is accepted
{{V2ScanRelationPushDown}} runs pushdown steps in order:
{code:java}
pushDownVariants // records extraction on ScanBuilderHolder
...
buildScanWithPushedVariants // calls builder.build(), replaces
ScanBuilderHolder
pruneColumns // matches ScanBuilderHolder only — no-op, holder
is gone
{code}
{*}builder.pruneColumns() is never called{*}. The scan reads the full table
schema, including unreferenced columns. This is most expensive for unreferenced
VARIANT columns — each is fully reconstructed from its shredded Parquet tree on
every row instead of being pruned.
This only affects the accepted pushdown path. When pushdown is declined or
disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
h3. Issue 2 — invalid plan on native Parquet V2
{{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the child
{{ScanBuilderHolder}} after returning the plan unchanged. When the bare
{{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects
unreferenced sibling VARIANT columns as full-variant requests and pushes them
to the builder. ParquetScanBuilder overwrites its state on every call, so the
second push clobbers the correct extraction from the first. The result is a
dangling ExprId in the projection and a *runtime crash:*
{code:java}
[INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in [a#72,v1#73,v2#74]
{code}
The broken plan:
{code:java}
*(1) !Project [variant_get(v1#57, $.x) ...]
+- BatchScan parquet [a#66, v1#67, v2#68]
PushedVariantExtractions: [v2:"$":VariantType]
{code}
h3. Reproduce (stock spark-4.1.2-bin-hadoop3)
Use a path-based view to force DSv2:
{code:java}
SET spark.sql.sources.useV1SourceList = "";
CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION '/tmp/vt';
INSERT INTO t VALUES (1, parse_json('{"x":1}'), parse_json('{"y":2}'));
CREATE OR REPLACE TEMPORARY VIEW tv USING parquet OPTIONS (path '/tmp/vt');
SELECT variant_get(v1, '$.x', 'int') FROM tv;
{code}
select crashes.
> Variant extraction pushdown bypasses column pruning on DSv2 scans
> -----------------------------------------------------------------
>
> Key: SPARK-57499
> URL: https://issues.apache.org/jira/browse/SPARK-57499
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 4.1.0, 4.1.2
> Reporter: Qiegang Long
> Priority: Major
> Labels: pull-request-available
>
> There are two issues with variant extraction pushdown in DSV2.
> h3. Issue 1: column pruning is skipped when variant pushdown is accepted
> {{V2ScanRelationPushDown}} runs pushdown steps in order:
> {code:java}
> pushDownVariants // records extraction on ScanBuilderHolder
> ...
> buildScanWithPushedVariants // calls builder.build(), replaces
> ScanBuilderHolder
> pruneColumns // matches ScanBuilderHolder only — no-op,
> holder is gone
> {code}
> {*}builder.pruneColumns() is never called{*}. The scan reads the full table
> schema, including unreferenced columns. This is most expensive for
> unreferenced VARIANT columns — each is fully reconstructed from its shredded
> Parquet tree on every row instead of being pruned.
> This only affects the accepted pushdown path. When pushdown is declined or
> disabled, the ScanBuilderHolder survives and pruneColumns runs normally.
> h3. Issue 2 — invalid plan on native Parquet V2
> {{pushDownVariants}} uses {{{}transformDown{}}}, which recurses into the
> child {{ScanBuilderHolder}} after returning the plan unchanged. When the bare
> {{ScanBuilderHolder}} matches PhysicalOperation a second time, it collects
> unreferenced sibling VARIANT columns as full-variant requests and pushes them
> to the builder. ParquetScanBuilder overwrites its state on every call, so the
> second push clobbers the correct extraction from the first. The result is a
> dangling ExprId in the projection and a *runtime crash:*
> {code:java}
> [INTERNAL_ERROR_ATTRIBUTE_NOT_FOUND] Could not find v1#57 in
> [a#72,v1#73,v2#74]
> {code}
> The broken plan:
> {code:java}
> *(1) !Project [variant_get(v1#57, $.x) ...]
> +- BatchScan parquet [a#66, v1#67, v2#68]
> PushedVariantExtractions: [v2:"$":VariantType]
> {code}
> h3. Reproduce (stock spark-4.1.2-bin-hadoop3)
> Use a path-based view to force DSv2:
> {code:java}
> SET spark.sql.sources.useV1SourceList = "";
> CREATE TABLE t (a INT, v1 VARIANT, v2 VARIANT) USING PARQUET LOCATION
> '/tmp/vt';
> INSERT INTO t VALUES (1, parse_json('{"x":1}'), parse_json('{"y":2}'));
> CREATE OR REPLACE TEMPORARY VIEW tv USING parquet OPTIONS (path '/tmp/vt');
> SELECT variant_get(v1, '$.x', 'int') FROM tv;
> {code}
> *select crashes.*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]