berkaysynnada commented on PR #14411:
URL: https://github.com/apache/datafusion/pull/14411#issuecomment-2646154037
> * No performance regression (benchmarks already showed)
> * Reduce memory footprint, for queries which batch can accumulate in
`RepartitionExec` (as the origin issue said)
>
> I tried to check the memory usage for `tpch-sf10` and `clickbench`, there
is no noticeable change for those queries. Perhaps we should construct queries
with this anti-pattern, and demonstrate memory usage can actually reduced by
this on-demand repartition executor?
Hi @2010YOUY01. I'd like to thank you firstly for this investigation. I
actually expect higher memory consumption—especially in systems where the
upstream part of the RepartitionExec produces results faster than its
downstream part (which is generally the case). But, as @Weijun-H asked, did you
enable the configuration for on-demand repartitioning? I'm indirectly answering
your second question below.
> ### Question
> In my understanding the new repartition executor is a wrapper on
`RepartitionExec`, to enable lazy evaluation, it should support both
`RoundRobin` and `Hash` repartition right? This PR only swapped `RoundRobin`,
do you also plan to add on-demand hash repartition in the future?
This new repartition is **not** a wrapper or merely an extension. It uses a
completely different mechanism. In both round-robin and hash repartitioning,
upstream polling is independent of downstream polling. That means:
```
FooExec
--RepartitionExec(hash or round-robin)
----BarExec
```
After you call execute() on FooExec, even if you don't poll FooStream at
all, BarStream is continuously polled until the channels in RepartitionExec are
full. The repartition type is only important when FooStream is polled, as it
determines which data is sent to which partition.
In our new design, we don't guarantee any particular ordering for the data.
All partitions can be polled at any time, and whichever one polls first gets
the first available data (a natural way of load balancing). On the upstream
side of the on-demand repartition, we don't fully poll the input streams; we
only poll one batch to avoid introducing latency. (When a poll occurs, it
retrieves that prefetched data and continues processing, while the repartition
process immediately starts again to replace the sent data.) BTW this prefetch
behavior can be adjusted; it can be either just one batch (buffering happens
upstream side of the channel), or n*partition_count (buffering happens
downstream side of the channels). We haven't decided yet (waiting for
benchmarks).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]