andygrove commented on issue #63:
URL: https://github.com/apache/arrow-datafusion/issues/63#issuecomment-846413793
Here is some additional information. When I run TPC-H query 5 in the
benchmarks, against DataFusion, I see that the physical plan used partitioned
joins.
For example, I see that both inputs to the join are partitioned on the join
keys, and the join mode is `Partitioned`.
```
HashJoinExec: mode=Partitioned, join_type=Inner, on=[("c_custkey",
"o_custkey")]
RepartitionExec: partitioning=Hash([Column { name: "c_custkey" }], 24)
ParquetExec: batch_size=8192, limit=None, partitions=[...]
RepartitionExec: partitioning=Hash([Column { name: "o_custkey" }], 24)
FilterExec: o_orderdate >= CAST(1994-01-01 AS Date32) AND o_orderdate <
CAST(1995-01-01 AS Date32)
ParquetExec: batch_size=8192, limit=None, partitions=[...]
```
This means that the join can run in parallel because the inputs are
partitioned. So partition 1 of the join reads partition 1 of the left and right
inputs, and so on.
When I run the same query against Ballista, I see.
```
HashJoinExec: mode=CollectLeft, join_type=Inner, on=[("c_custkey",
"o_custkey")]
ParquetExec: batch_size=8192, limit=None, partitions=[...]
FilterExec: o_orderdate >= CAST(1994-01-01 AS Date32) AND o_orderdate <
CAST(1995-01-01 AS Date32)
ParquetExec: batch_size=8192, limit=None, partitions=[
```
Here, we see join mode `CollectLeft`, which means that each partition being
executed will go and fetch the entire left-side of the join into memory. This
is very inefficient both in terms of memory and compute and potentially gets
exponentially slower the more partitions we have.
What we need to do is apply the same "partitioned hash join" pattern to
Ballista.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]