[
https://issues.apache.org/jira/browse/FLINK-23402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17382130#comment-17382130
]
Stephan Ewen edited comment on FLINK-23402 at 7/16/21, 3:18 PM:
----------------------------------------------------------------
Big +1 to have a single setting for the shuffle mode in DataStream and Table,
and also using the same default values.
Otherwise this will confuse the heck out of users.
I am not 100% sure if we should expose the {{setExchangeMode())}} method as
such on the {{StreamExecutionEnvironment}}.
You can actually configure values that don't make sense together, like
{{RuntimeMode.STREAMING}} and a {{ExchangeMode.BATCH}}.
Let's quickly check why just the runtime mode would not be enough?
RuntimeMode.STREAMING means everything is pipelined anyways.
RuntimeMode.BATCH means you have some freedom:
- everything pipelined
- batch shuffles, pipelined forwards
- batch everything
Since the shuffle behavior is only configurable for BATCH anyways, shouldn't
this be a property of RuntimeMode.BATCH?
Something like:
- {{RuntimeMode.BATCH}} <= default setting for batch
- {{RuntimeMode.BATCH.withBatchAllExchanges()}} <= batches also forward
connections
was (Author: stephanewen):
Big +1 to have a single setting for the shuffle mode in DataStream and Table,
and also using the same default values.
Otherwise this will confuse the heck out of users.
I am not 100% sure if we should expose the {{setExchangeMode())}} method as
such on the {{StreamExecutionEnvironment}}.
You can actually configure values that don't make sense together, like
{{RuntimeMode.STREAMING}} and a {{ExchangeMode.BATCH}}.
Let's quickly check why just the runtime mode would not be enough?
RuntimeMode.STREAMING means everything is pipelined anyways.
RuntimeMode.BATCH means you have some freedom:
- everything pipelined
- batch shuffles, pipelined forwards
- batch everything
Since the shuffle behavior is only configurable for BATCH anyways, shouldn't
this be a property of RuntimeMode.BATCH
Something like:
- {{RuntimeMode.BATCH}} <= default setting for batch
- {{RuntimeMode.BATCH.withBatchAllExchanges()}} <= batches also forward
connections
> Expose a consistent GlobalDataExchangeMode
> ------------------------------------------
>
> Key: FLINK-23402
> URL: https://issues.apache.org/jira/browse/FLINK-23402
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream
> Reporter: Timo Walther
> Priority: Major
>
> The Table API makes the {{GlobalDataExchangeMode}} configurable via
> {{table.exec.shuffle-mode}}.
> In Table API batch mode the StreamGraph is configured with
> {{ALL_EDGES_BLOCKING}} and in DataStream API batch mode
> {{FORWARD_EDGES_PIPELINED}}.
> I would vote for unifying the exchange mode of both APIs so that complex SQL
> pipelines behave identical in {{StreamTableEnvironment}} and
> {{TableEnvironment}}. Also the feedback a got so far would make
> {{ALL_EDGES_BLOCKING}} a safer option to run pipelines successfully with
> limited resources.
> [~lzljs3620320]
> {noformat}
> The previous history was like this:
> - The default value is pipeline, and we find that many times due to
> insufficient resources, the deployment will hang. And the typical use of
> batch jobs is small resources running large parallelisms, because in batch
> jobs, the granularity of failover is related to the amount of data processed
> by a single task. The smaller the amount of data, the faster the fault
> tolerance. So most of the scenarios are run with small resources and large
> parallelisms, little by little slowly running.
> - Later, we switched the default value to blocking. We found that the better
> blocking shuffle implementation would not slow down the running speed much.
> We tested tpc-ds and it took almost the same time.
> {noformat}
> [~dwysakowicz]
> {noformat}
> I don't see a problem with changing the default value for DataStream batch
> mode if you think ALL_EDGES_BLOCKING is the better default option.
> {noformat}
> In any case, we should make this configurable for DataStream API users and
> make the specific Table API option obsolete.
> It would include the following steps:
> - Move {{GlobalDataExchangeMode}} from {{o.a.f.streaming.api.graph}} to
> {{o.a.f.api.common}} (with reworked JavaDocs) as {{ExchangeMode}} (to have it
> shorter) next to {{RuntimeMode}}
> - Add {{StreamExecutionEnvironment.setExchangeMode()}} next to
> {{setRuntimeMode}}
> - Add option {{execution.exchange-mode}}
> - Add checks for invalid combinations to StreamGraphGenerator
> - Deprecate ExecutionMode ([avoid
> confusion|https://stackoverflow.com/questions/68335472/what-is-difference-in-runtimeexecutionmode-and-executionmode])
--
This message was sent by Atlassian Jira
(v8.3.4#803005)