[
https://issues.apache.org/jira/browse/ARROW-11058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255835#comment-17255835
]
Jorge Leitão edited comment on ARROW-11058 at 12/29/20, 6:30 AM:
-----------------------------------------------------------------
This aspect of datafusion is a bit unclear atm: in datafusion, it seems that we
have two types of "buckets": parts and batches, while in spark there is only
parts (via partitioning). In spark, the partitioning tradeoff is related to
higher parallelism vs slower exchanges, but I can't find the equivalent
tradeoff with number of batches per part in datafusion.
6 months ago, my hypothesis was that partitioning would be used for
inter-process parallelism, while batches would be used for intra-process
parallelism. My idea at the time was: there is a stream of parts, and each part
is an iterator of batches: batch execution runs in rayon, and each part is a
future and part of a stream (via Tokio, potentially in another machine). In
this design, the two "buckets" represent different parts of parallelism: thread
parallelism and process parallelism (e.g. cross-machine), that in a single
machine would be run by two different thread pools.
But since we use a stream of batches and a stream of parts, I can't think of a
way to differentiate them. E.g. let's say that we implement the "coalesce
batches". When does using it is expected to improve performance? When should we
add them in the optimizer?
More quantitatively: given N rows, in spark we can distribute them in P parts,
while in datafusion we can distribute them in P parts and B batches. In spark,
P deals with parallelism in very specific ways (higher P => higher parallelism
and more tasks). In DataFusion, it is a bit unclear how the tuple (P,B) leads
to one or the other, and what is the reason we have P and B in the first place
(since they all run on the same thread pool and both are async).
AFAIK B is not necessarily related to vectorization, as the vectorization (at
least on the CPU level) happens at much smaller chunks (lane size). B does not
lead to higher parallelism also: since they are part of a stream, there is no
way to run two batches from a single part in parallel, as we need to finish the
execution of one before the start of the next.
This aspect of parallelism in datafusion was clear to me 6 months ago but
became unclear when we converted the recordbatchreader to a stream.
was (Author: jorgecarleitao):
This aspect of datafusion is a bit unclear atm: in datafusion, it seems that we
have two types of "buckets": parts and batches, while in spark there is only
parts (via partitioning). In spark, the partitioning tradeoff is related to
higher parallelism vs slower exchanges, but I can't find the equivalent
tradeoff with number of batches per part in datafusion.
6 months ago, my hypothesis was that partitioning would be used for
inter-process parallelism, while batches would be used for intra-process
parallelism. My idea at the time was: there is a stream of parts, and each part
is an iterator of batches: batch execution runs in rayon, and each part is a
future and part of a stream (via Tokio, potentially in another machine). In
this design, the two "buckets" represent different parts of parallelism: thread
parallelism (i.e. same machine) and process parallelism (i.e. cross-machine),
that in a single machine would be run by two different thread pools.
But since we use a stream of batches and a stream of parts, I can't think of a
way to differentiate them. E.g. let's say that we implement the "coalesce
batches". When does using it is expected to improve performance? When should we
add them in the optimizer?
More quantitatively: given N rows, in spark we can distribute them in P parts,
while in datafusion we can distribute them in P parts and B batches. In spark,
P deals with parallelism in very specific ways (higher P => higher parallelism
and more tasks). In DataFusion, it is a bit unclear how the tuple (P,B) leads
to one or the other, and what is the reason we have P and B in the first place
(since they all run on the same thread pool and both are async).
AFAIK B is not necessarily related to vectorization, as the vectorization (at
least on the CPU level) happens at much smaller chunks (lane size). B does not
lead to higher parallelism also: since they are part of a stream, there is no
way to run two batches from a single part in parallel, as we need to finish the
execution of one before the start of the next.
This aspect of parallelism in datafusion was clear to me 6 months ago but
became unclear when we converted the recordbatchreader to a stream.
> [Rust] [DataFusion] Implement "coalesce batches" operator
> ---------------------------------------------------------
>
> Key: ARROW-11058
> URL: https://issues.apache.org/jira/browse/ARROW-11058
> Project: Apache Arrow
> Issue Type: Improvement
> Components: Rust - DataFusion
> Reporter: Andy Grove
> Assignee: Andy Grove
> Priority: Major
> Fix For: 3.0.0
>
>
> When we have a FilterExec in the plan, it can produce lots of small batches
> and we therefore lose efficiency of vectorized operations.
> We should implement a new CoalesceBatchExec and wrap every FilterExec with
> one of these so that small batches can be recombined into larger batches to
> improve the efficiency of upstream operators.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)