[ https://issues.apache.org/jira/browse/FLINK-23470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386107#comment-17386107 ]
Zhu Zhu commented on FLINK-23470: --------------------------------- Sorry I did not see the discussion in FLINK-23402 until I noticed this ticket. I agree that a "pipeline within a slot" mode is of good value and is easy for user understanding. Actually {{FORWARD_EDGES_PIPELINED}} was designed for this purpose. It was working for table/sql jobs. However, it may not work for DataStream jobs because forward upstream and downstream tasks can be set to different slot sharing groups. Therefore, to achieve goal, I think we can modify the handling of {{FORWARD_EDGES_PIPELINED}} in {{StreamingJobGraphGenerator#determineResultPartitionType()}} a bit to generate a PIPELINED edge only if the upstream and downstream tasks are in the same slot sharing group. And we may drop {{RESCALE_EDGES_PIPELINED}} and rename {{FORWARD_EDGES_PIPELINED}} to {{PIPELINED_WITHIN_SLOT}}/{{AUTOMATIC}}. > Use blocking shuffles but pipeline within a slot for batch mode > --------------------------------------------------------------- > > Key: FLINK-23470 > URL: https://issues.apache.org/jira/browse/FLINK-23470 > Project: Flink > Issue Type: New Feature > Components: API / DataStream > Reporter: Timo Walther > Priority: Major > > As discussed in FLINK-23402, we would like to introduce a good default > shuffle mode for batch runtime mode that is a trade-off between all pipelined > and all blocking shuffles. > From the discussion in FLINK-23402: > For the shuffle modes, I think those three settings are actually sufficient.: > 1. pipeline all, for batch execution that wants pipelined shuffles. (Still > batch recovery, no checkpoints, batch operators) > 2. batch all, just in case you want to. > 3. batch shuffles, pipeline within a slot. (DEFAULT) > This should be the default, and it means we batch whenever a slot has a > dependency on another slot. > A dependency between slots is: > - any all-to-all connection (keyBy, broadcast, rebalance, random) > - any pointwise connection (rescale) > - any forward between different slot sharing groups > Effectively only FORWARD connections within the same slot sharing group has > no dependency on another slot. > That mode makes a lot of sense as the default, because it guarantees that we > can always run the program as long as we have at least one slot. No resource > starvation ever. But it retains pipelining where we don't chain operators due > to missing chaining logic (but we still slot-share them). > Compared to this (3) mode, FORWARD_EDGES_PIPELINED and > POINTWISE_EDGES_PIPELINED are not well-defined. > POINTWISE_EDGES_PIPELINED is a gamble, it only works if you have a certain > amount of resources, related to the rescale factor. Otherwise the job may > fail with resource starvation. Hard to understand and debug for users; not a > great option in my opinion. > FORWARD_EDGES_PIPELINED can also lead to job failure with resource starvation > when the forward connection connects different slot sharing groups. > That's why I would drop those (they make it confusing for users) not reuse > the GlobalDataExchangeMode, and rather introduce the option (3) above, which > mostly batches the exchanges, except when then they are guaranteed to be in > the same slot. > As a side note: The difference between (3) and (2) should be already > relatively small in SQL jobs and become smaller over time, as more and more > can be chained together. -- This message was sent by Atlassian Jira (v8.3.4#803005)