[ 
https://issues.apache.org/jira/browse/FLINK-30198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17640380#comment-17640380
 ] 

Weijie Guo commented on FLINK-30198:
------------------------------------

IMO, it seems too strange to identify the hash input and then define the 
reducer task. So +1 for [~wanglijie]'s proposal to define per-task-volume for 
each stage(job vertex). Maybe we can first introduce the support of setting 
consumed data volume in jobvertex granularity, then introduction the SQL/Planer 
level configuration to configure per task data volume for join/agg/sort 
operators. In this way, manual configuration for datastream jobs can also be 
supported. Of course, any other better proposals are also acceptable.

> Support AdaptiveBatchScheduler to set per-task size for reducer task 
> ---------------------------------------------------------------------
>
>                 Key: FLINK-30198
>                 URL: https://issues.apache.org/jira/browse/FLINK-30198
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: Aitozi
>            Priority: Major
>
> When we use AdaptiveBatchScheduler in our case, we found that it can work 
> well in most case, but there is a limit that, there is only one global 
> parameter for per task data size by 
> {{jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task}}. 
> However, in a map-reduce architecture, the reducer tasks are usually have 
> more complex computation logic such as aggregate/sort/join operators. So I 
> think it will be nicer if we can set the reducer and mapper task's data size 
> per task individually.
> Then, how to distinguish the reducer task?
> IMO, we can let the parallelism decider know whether the vertex have a hash 
> edge inputs. If yes, it should be a reducer task.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to