lianjunzhi created SPARK-34985:
----------------------------------
Summary: Different execution plans under jdbc and hdfs
Key: SPARK-34985
URL: https://issues.apache.org/jira/browse/SPARK-34985
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.0.1
Environment: spark 3.0.1
hive 2.1.1-cdh6.2.0
hadoop 3.0.0-cdh6.2.0
Reporter: lianjunzhi
Hive has two non-partitioned tables, trade_order and trade_order_goods.
Trade_order contains four fields: trade_id, company_id, is_delete, and
trade_status. trade_order_goods contains four fields: trade_id, cost,
is_delete, and sell_total. Run the following code snippets:
{quote}val df = spark.sql(
"""
|select
|b.company_id,
|sum(a.cost) cost
|FROM oms.trade_order_goods a
| JOIN oms.trade_order b
|ON a.trade_id = b.trade_id
|WHERE a.is_delete = 0 AND b.is_delete = 0
|GROUP BY
|b.company_id
|""".stripMargin){quote}
{quote}df.explain() //Physical Plan 1{quote}
{quote}df.write.insertInto("oms.test") //Physical Plan 2{quote}
{quote}df.write
.format("jdbc")
.option("url", "")
.option("dbtable", "test")
.option("user", "")
.option("password", "")
.option("driver", "com.mysql.jdbc.Driver")
.option("truncate", value = true)
.option("batchsize", 15000)
.mode(SaveMode.Append)
.save() //Physical Plan 3{quote}
Physical Plan 1:
{quote}AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[company_id#6L], functions=[sum(cost#2)])
+- Exchange hashpartitioning(company_id#6L, 6), true, [id=#40]
+- HashAggregate(keys=[company_id#6L], functions=[partial_sum(cost#2)])
+- Project [cost#2, company_id#6L]
+- SortMergeJoin [trade_id#1L], [trade_id#5L], Inner
:- Sort [trade_id#1L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#32]
: +- Project [trade_id#1L, cost#2]
: +- Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND
isnotnull(trade_id#1L))
: +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3]
Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0),
isnotnull(trade_id#1L)], Format: Parquet, Location:
InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods],
PartitionFilters: [], PushedFilters: [IsNotNull(is_delete),
EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema:
struct<trade_id:bigint,cost:decimal(18,2),is_delete:int>
+- Sort [trade_id#5L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#33]
+- Project [trade_id#5L, company_id#6L]
+- Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND
isnotnull(trade_id#5L))
+- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7]
Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0),
isnotnull(trade_id#5L)], Format: Parquet, Location:
InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order],
PartitionFilters: [], PushedFilters: [IsNotNull(is_delete),
EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema:
struct<trade_id:bigint,company_id:bigint,is_delete:int>{quote}
Physical Plan 2:
{quote}+- AdaptiveSparkPlan isFinalPlan=true
+- *(6) HashAggregate(keys=[company_id#6L], functions=[sum(cost#2)],
output=[company_id#6L, cost#28])
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 2
+- Exchange hashpartitioning(company_id#6L, 6), true, [id=#244]
+- *(5) HashAggregate(keys=[company_id#6L], functions=[partial_sum(cost#2)],
output=[company_id#6L, sum#21])
+- *(5) Project [cost#2, company_id#6L]
+- *(5) SortMergeJoin [trade_id#1L], [trade_id#5L], Inner
:- *(3) Sort [trade_id#1L ASC NULLS FIRST], false, 0
: +- CustomShuffleReader coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#119]
: +- *(1) Project [trade_id#1L, cost#2]
: +- *(1) Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND
isnotnull(trade_id#1L))
: +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3]
Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0),
isnotnull(trade_id#1L)], Format: Parquet, Location:
InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods],
PartitionFilters: [], PushedFilters: [IsNotNull(is_delete),
EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema:
struct<trade_id:bigint,cost:decimal(18,2),is_delete:int>
+- *(4) Sort [trade_id#5L ASC NULLS FIRST], false, 0
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#126]
+- *(2) Project [trade_id#5L, company_id#6L]
+- *(2) Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND
isnotnull(trade_id#5L))
+- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7]
Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0),
isnotnull(trade_id#5L)], Format: Parquet, Location:
InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order],
PartitionFilters: [], PushedFilters: [IsNotNull(is_delete),
EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema:
struct<trade_id:bigint,company_id:bigint,is_delete:int>
{quote}
Physical Plan 3:
{quote}Execute SaveIntoDataSourceCommand
+- SaveIntoDataSourceCommand
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider@64ee110b,
Map(url -> *********(redacted), truncate -> true, batchsize -> 15000, driver ->
com.mysql.jdbc.Driver, dbtable -> test, user -> jkyun, password ->
*********(redacted)), Append
+- Aggregate [company_id#6L], [company_id#6L, sum(cost#2) AS cost#0]
+- Filter ((is_delete#3 = 0) AND (is_delete#7 = 0))
+- Join Inner, (trade_id#1L = trade_id#5L)
:- SubqueryAlias a
: +- SubqueryAlias spark_catalog.oms.trade_order_goods
: +- Relation[trade_id#1L,cost#2,is_delete#3,sell_total#4] parquet
+- SubqueryAlias b
+- SubqueryAlias spark_catalog.oms.trade_order
+- Relation[trade_id#5L,company_id#6L,is_delete#7,trade_status#8] parquet
{quote}
As you can see, Physical Plan 3 does not have column pruning and predicate
pushdown.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]