Yuming Wang created SPARK-37915:
-----------------------------------
Summary: Push down deterministic projection through SQL UNION
Key: SPARK-37915
URL: https://issues.apache.org/jira/browse/SPARK-37915
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.3.0
Reporter: Yuming Wang
{code:scala}
spark.range(11).selectExpr("cast(id as decimal(18, 1)) as a", "id as b", "id as
c").write.saveAsTable("t1")
spark.range(12).selectExpr("cast(id as decimal(18, 2)) as a", "id as b", "id as
c").write.saveAsTable("t2")
spark.range(13).selectExpr("cast(id as decimal(18, 3)) as a", "id as b", "id as
c").write.saveAsTable("t3")
spark.range(14).selectExpr("cast(id as decimal(18, 4)) as a", "id as b", "id as
c").write.saveAsTable("t4")
spark.range(15).selectExpr("cast(id as decimal(18, 5)) as a", "id as b", "id as
c").write.saveAsTable("t5")
sql("select a from t1 union select a from t2 union select a from t3 union
select a from t4 union select a from t5").explain(true)
{code}
Current:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#76], functions=[], output=[a#76])
+- Exchange hashpartitioning(a#76, 5), ENSURE_REQUIREMENTS, [id=#159]
+- HashAggregate(keys=[a#76], functions=[], output=[a#76])
+- Union
:- HashAggregate(keys=[a#74], functions=[], output=[a#76])
: +- Exchange hashpartitioning(a#74, 5), ENSURE_REQUIREMENTS,
[id=#154]
: +- HashAggregate(keys=[a#74], functions=[], output=[a#74])
: +- Union
: :- HashAggregate(keys=[a#72], functions=[],
output=[a#74])
: : +- Exchange hashpartitioning(a#72, 5),
ENSURE_REQUIREMENTS, [id=#149]
: : +- HashAggregate(keys=[a#72], functions=[],
output=[a#72])
: : +- Union
: : :- HashAggregate(keys=[a#70], functions=[],
output=[a#72])
: : : +- Exchange hashpartitioning(a#70, 5),
ENSURE_REQUIREMENTS, [id=#144]
: : : +- HashAggregate(keys=[a#70],
functions=[], output=[a#70])
: : : +- Union
: : : :- Project [cast(a#55 as
decimal(19,2)) AS a#70]
: : : : +- FileScan parquet
default.t1[a#55] Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,1)>
: : : +- Project [cast(a#58 as
decimal(19,2)) AS a#71]
: : : +- FileScan parquet
default.t2[a#58] Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,2)>
: : +- Project [cast(a#61 as decimal(20,3)) AS
a#73]
: : +- FileScan parquet default.t3[a#61]
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,3)>
: +- Project [cast(a#64 as decimal(21,4)) AS a#75]
: +- FileScan parquet default.t4[a#64] Batched: true,
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,4)>
+- Project [cast(a#67 as decimal(22,5)) AS a#77]
+- FileScan parquet default.t5[a#67] Batched: true, DataFilters:
[], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,5)>
{noformat}
Expected:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#76], functions=[], output=[a#76])
+- Exchange hashpartitioning(a#76, 5), ENSURE_REQUIREMENTS, [id=#111]
+- HashAggregate(keys=[a#76], functions=[], output=[a#76])
+- Union
:- Project [cast(cast(cast(cast(a#55 as decimal(19,2)) as
decimal(20,3)) as decimal(21,4)) as decimal(22,5)) AS a#76]
: +- FileScan parquet default.t1[a#55] Batched: true, DataFilters:
[], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,1)>
:- Project [cast(cast(cast(cast(a#58 as decimal(19,2)) as
decimal(20,3)) as decimal(21,4)) as decimal(22,5)) AS a#89]
: +- FileScan parquet default.t2[a#58] Batched: true, DataFilters:
[], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,2)>
:- Project [cast(cast(cast(a#61 as decimal(20,3)) as decimal(21,4))
as decimal(22,5)) AS a#87]
: +- FileScan parquet default.t3[a#61] Batched: true, DataFilters:
[], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,3)>
:- Project [cast(cast(a#64 as decimal(21,4)) as decimal(22,5)) AS
a#84]
: +- FileScan parquet default.t4[a#64] Batched: true, DataFilters:
[], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,4)>
+- Project [cast(a#67 as decimal(22,5)) AS a#77]
+- FileScan parquet default.t5[a#67] Batched: true, DataFilters:
[], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,5)>
{noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]