[ 
https://issues.apache.org/jira/browse/FLINK-23470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386107#comment-17386107
 ] 

Zhu Zhu commented on FLINK-23470:
---------------------------------

Sorry I did not see the discussion in FLINK-23402 until I noticed this ticket.
I agree that a "pipeline within a slot" mode is of good value and is easy for 
user understanding.
Actually {{FORWARD_EDGES_PIPELINED}} was designed for this purpose. It was 
working for table/sql jobs. However, it may not work for DataStream jobs 
because forward upstream and downstream tasks can be set to different slot 
sharing groups.

Therefore, to achieve goal, I think we can modify the handling of 
{{FORWARD_EDGES_PIPELINED}} in 
{{StreamingJobGraphGenerator#determineResultPartitionType()}} a bit to generate 
a PIPELINED edge only if the upstream and downstream tasks are in the same slot 
sharing group. And we may drop {{RESCALE_EDGES_PIPELINED}} and rename 
{{FORWARD_EDGES_PIPELINED}} to {{PIPELINED_WITHIN_SLOT}}/{{AUTOMATIC}}.

> Use blocking shuffles but pipeline within a slot for batch mode
> ---------------------------------------------------------------
>
>                 Key: FLINK-23470
>                 URL: https://issues.apache.org/jira/browse/FLINK-23470
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>            Reporter: Timo Walther
>            Priority: Major
>
> As discussed in FLINK-23402, we would like to introduce a good default 
> shuffle mode for batch runtime mode that is a trade-off between all pipelined 
> and all blocking shuffles.
> From the discussion in FLINK-23402:
> For the shuffle modes, I think those three settings are actually sufficient.:
> 1. pipeline all, for batch execution that wants pipelined shuffles. (Still 
> batch recovery, no checkpoints, batch operators)
> 2. batch all, just in case you want to.
> 3. batch shuffles, pipeline within a slot. (DEFAULT)
> This should be the default, and it means we batch whenever a slot has a 
> dependency on another slot.
> A dependency between slots is:
> - any all-to-all connection (keyBy, broadcast, rebalance, random)
> - any pointwise connection (rescale)
> - any forward between different slot sharing groups
> Effectively only FORWARD connections within the same slot sharing group has 
> no dependency on another slot.
> That mode makes a lot of sense as the default, because it guarantees that we 
> can always run the program as long as we have at least one slot. No resource 
> starvation ever. But it retains pipelining where we don't chain operators due 
> to missing chaining logic (but we still slot-share them).
> Compared to this (3) mode, FORWARD_EDGES_PIPELINED and 
> POINTWISE_EDGES_PIPELINED are not well-defined.
> POINTWISE_EDGES_PIPELINED is a gamble, it only works if you have a certain 
> amount of resources, related to the rescale factor. Otherwise the job may 
> fail with resource starvation. Hard to understand and debug for users; not a 
> great option in my opinion.
> FORWARD_EDGES_PIPELINED can also lead to job failure with resource starvation 
> when the forward connection connects different slot sharing groups.
> That's why I would drop those (they make it confusing for users) not reuse 
> the GlobalDataExchangeMode, and rather introduce the option (3) above, which 
> mostly batches the exchanges, except when then they are guaranteed to be in 
> the same slot.
> As a side note: The difference between (3) and (2) should be already 
> relatively small in SQL jobs and become smaller over time, as more and more 
> can be chained together.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to