[ 
https://issues.apache.org/jira/browse/FLINK-23402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17382147#comment-17382147
 ] 

Stephan Ewen edited comment on FLINK-23402 at 7/16/21, 5:35 PM:
----------------------------------------------------------------

For the shuffle modes, I think those three settings are actually sufficient.:

  1. pipeline all, for batch execution that wants pipelined shuffles. (Still 
batch recovery, no checkpoints, batch operators)
  2. batch all, just in case you want to.
  3. batch shuffles, pipeline within a slot. (DEFAULT)

This should be the default, and it means we *batch whenever a slot has a 
dependency on another slot.*

A dependency between slots is:
  - any all-to-all connection (keyBy, broadcast, rebalance, random)
  - any pointwise connection (rescale)
  - any forward between different slot sharing groups

Effectively only FORWARD connections within the same slot sharing group has no 
dependency on another slot. 

That mode makes a lot of sense as the default, because it guarantees that we 
*can always run the program as long as we have at least one slot*. No resource 
starvation ever. But it retains pipelining where we don't chain operators due 
to missing chaining logic (but we still slot-share them).

Compared to this (3) mode, {{FORWARD_EDGES_PIPELINED}} and 
{{POINTWISE_EDGES_PIPELINED}} are not well-defined.
  - {{POINTWISE_EDGES_PIPELINED}} is a gamble, it gets only works if you have a 
certain amount of resources, related to the rescale factor. Hard to understand. 
Otherwise the job may fail with resource starvation.
  - {{FORWARD_EDGES_PIPELINED}} can also lead to job failure with resource 
starvation when the forward connection connects different slot sharing groups.
That's why I would drop those (they make it confusing for users) not reuse the 
{{GlobalDataExchangeMode}}.

[~dwysakowicz] and [~arvid], would be good to hear your thoughts on this one.


was (Author: stephanewen):
For the shuffle modes, I think those three settings are actually sufficient.:

  1. pipeline all, for batch execution that wants pipelined shuffles. (Still 
batch recovery, no checkpoints, batch operators)
  2. batch all, just in case you want to.
  3. batch shuffles, pipeline within a slot. (DEFAULT)

This should be the default, and it means we *batch whenever a slot has a 
dependency on another slot.*

A dependency between slots is:
  - any all-to-all connection (keyBy, broadcast, rebalance, random)
  - any pointwise connection (rescale)
  - any forward between different slot sharing groups
Effectively only FORWARD connections within the same slot sharing group has no 
dependency on another slot. 

That mode makes a lot of sense as the default, because it guarantees that we 
*can always run the program as long as we have at least one slot*. No resource 
starvation ever. But it retains pipelining where we don't chain operators due 
to missing chaining logic (but we still slot-share them).

Compared to this (3) mode, {{FORWARD_EDGES_PIPELINED}} and 
{{POINTWISE_EDGES_PIPELINED}} are not well-defined.
  - {{POINTWISE_EDGES_PIPELINED}} is a gamble, it gets only works if you have a 
certain amount of resources, related to the rescale factor. Hard to understand. 
Otherwise the job may fail with resource starvation.
  - {{FORWARD_EDGES_PIPELINED}} can also lead to job failure with resource 
starvation when the forward connection connects different slot sharing groups.
That's why I would drop those (they make it confusing for users) not reuse the 
{{GlobalDataExchangeMode}}.

[~dwysakowicz] and [~arvid], would be good to hear your thoughts on this one.

> Expose a consistent GlobalDataExchangeMode
> ------------------------------------------
>
>                 Key: FLINK-23402
>                 URL: https://issues.apache.org/jira/browse/FLINK-23402
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream
>            Reporter: Timo Walther
>            Priority: Major
>
> The Table API makes the {{GlobalDataExchangeMode}} configurable via 
> {{table.exec.shuffle-mode}}.
> In Table API batch mode the StreamGraph is configured with 
> {{ALL_EDGES_BLOCKING}} and in DataStream API batch mode 
> {{FORWARD_EDGES_PIPELINED}}.
> I would vote for unifying the exchange mode of both APIs so that complex SQL 
> pipelines behave identical in {{StreamTableEnvironment}} and 
> {{TableEnvironment}}. Also the feedback a got so far would make 
> {{ALL_EDGES_BLOCKING}} a safer option to run pipelines successfully with 
> limited resources.
> [~lzljs3620320]
> {noformat}
> The previous history was like this:
> - The default value is pipeline, and we find that many times due to 
> insufficient resources, the deployment will hang. And the typical use of 
> batch jobs is small resources running large parallelisms, because in batch 
> jobs, the granularity of failover is related to the amount of data processed 
> by a single task. The smaller the amount of data, the faster the fault 
> tolerance. So most of the scenarios are run with small resources and large 
> parallelisms, little by little slowly running.
> - Later, we switched the default value to blocking. We found that the better 
> blocking shuffle implementation would not slow down the running speed much. 
> We tested tpc-ds and it took almost the same time.
> {noformat}
> [~dwysakowicz]
> {noformat}
> I don't see a problem with changing the default value for DataStream batch 
> mode if you think ALL_EDGES_BLOCKING is the better default option.
> {noformat}
> In any case, we should make this configurable for DataStream API users and 
> make the specific Table API option obsolete.
> It would include the following steps:
> - Move {{GlobalDataExchangeMode}} from {{o.a.f.streaming.api.graph}} to 
> {{o.a.f.api.common}} (with reworked JavaDocs) as {{ExchangeMode}} (to have it 
> shorter) next to {{RuntimeMode}}
> - Add {{StreamExecutionEnvironment.setExchangeMode()}} next to 
> {{setRuntimeMode}}
> - Add option {{execution.exchange-mode}}
> - Add checks for invalid combinations to StreamGraphGenerator
> - Deprecate ExecutionMode ([avoid 
> confusion|https://stackoverflow.com/questions/68335472/what-is-difference-in-runtimeexecutionmode-and-executionmode])



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to