wangxiaoying commented on issue #423:
URL:
https://github.com/apache/incubator-wayang/issues/423#issuecomment-2043243095
> 1. The type of join that Spark SQL uses. Wayang's current join operator
maps to the corresponding join in RDDs, which if I'm not mistaken is
implemented as as hash join. Maybe Spark SQL uses a broadcast join and thus,
the difference in the data transferred?
Yes, I think the executed join algorithms are different from the two
approaches. Below is the default physical plan generated by spark:
```
+- == Final Plan ==
*(8) Sort [revenue#138 DESC NULLS LAST, o_orderdate#58 ASC NULLS FIRST],
true, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 5
+- Exchange rangepartitioning(revenue#138 DESC NULLS LAST,
o_orderdate#58 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=448]
+- *(7) HashAggregate(keys=[l_orderkey#0, o_orderdate#58,
o_shippriority#61], functions=[sum((l_extendedprice#5 * (1 - l_discount#6)))])
+- *(7) HashAggregate(keys=[l_orderkey#0, o_orderdate#58,
o_shippriority#61], functions=[partial_sum((l_extendedprice#5 * (1 -
l_discount#6)))])
+- *(7) Project [o_orderdate#58, o_shippriority#61,
l_orderkey#0, l_extendedprice#5, l_discount#6]
+- *(7) SortMergeJoin [o_orderkey#54], [l_orderkey#0],
Inner
:- *(5) Sort [o_orderkey#54 ASC NULLS FIRST], false, 0
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 4
: +- Exchange hashpartitioning(o_orderkey#54,
200), ENSURE_REQUIREMENTS, [plan_id=341]
: +- *(4) Project [o_orderkey#54,
o_orderdate#58, o_shippriority#61]
: +- *(4) BroadcastHashJoin
[c_custkey#36], [o_custkey#55], Inner, BuildLeft, false
: :- BroadcastQueryStage 3
: : +- BroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false),
[plan_id=225]
: : +- AQEShuffleRead local
: : +- ShuffleQueryStage 0
: : +- Exchange
hashpartitioning(c_custkey#36, 200), ENSURE_REQUIREMENTS, [plan_id=132]
: : +- *(1) Project
[c_custkey#36]
: : +- *(1) Filter
(staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils,
StringType, readSidePadding, c_mktsegment#42, 10, true, false, true) = BUILDING
)
: : +- *(1) Scan
JDBCRelation(public.customer) [numPartitions=1] [c_custkey#36,c_mktsegment#42]
PushedFilters: [*IsNotNull(c_custkey)], ReadSchema:
struct<c_custkey:int,c_mktsegment:string>
: +- AQEShuffleRead local
: +- ShuffleQueryStage 1
: +- Exchange
hashpartitioning(o_custkey#55, 200), ENSURE_REQUIREMENTS, [plan_id=139]
: +- *(2) Scan
JDBCRelation(public.orders) [numPartitions=1]
[o_orderkey#54,o_custkey#55,o_orderdate#58,o_shippriority#61] PushedFilters:
[*IsNotNull(o_orderdate), *LessThan(o_orderdate,1995-03-15),
*IsNotNull(o_custkey), *IsNotNull(o_..., ReadSchema:
struct<o_orderkey:int,o_custkey:int,o_orderdate:date,o_shippriority:int>
+- *(6) Sort [l_orderkey#0 ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- Exchange hashpartitioning(l_orderkey#0,
200), ENSURE_REQUIREMENTS, [plan_id=150]
+- *(3) Scan
JDBCRelation(public.lineitem) [numPartitions=1]
[l_orderkey#0,l_extendedprice#5,l_discount#6] PushedFilters:
[*IsNotNull(l_shipdate), *GreaterThan(l_shipdate,1995-03-15),
*IsNotNull(l_orderkey)], ReadSchema:
struct<l_orderkey:int,l_extendedprice:decimal(15,2),l_discount:decimal(15,2)>
```
I tried to add config: `.config("spark.sql.join.preferSortMergeJoin",
"false")` when building the spark session so the `SortMergeJoin` above will
become a `HashJoin`, but the performance does not changes much. `BoradcastJoin`
is still used though.
> 2. I'm not very familiar with the views in Spark, but when one registers
the temporary views are they materialized in memory? If so, the timer you have
would measure data accessed via memory. But again not sure how the temp views
in Spark work. Maybe you could time the registerviews method to check this out.
Spark uses lazy evaluation so the view creation does not take much time
(only some metadata will be fetched). And as I have shown above in the postgres
log, spark does fetch the three tables (with projection and filter pushdown)
during runtime like wayang does.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]