austince edited a comment on pull request #15497: URL: https://github.com/apache/flink/pull/15497#issuecomment-815068947
> It's not really about consistency; but about the AdaptiveScheduler not _always_ explicitly setting the maxParallelism, such that if it was not set by a user we _always_ fall back to the maxParallelism defined in the savepoint, if it exists, irrespective of what max parallelism the scheduler came up with earlier. I don't think this is quite the case -- the AdaptiveScheduler does _always_ explicitly set the maxParallelism, but if the user does not set a value explicitly, we now allow the default the AdaptiveScheduler has set to be overridden by the savepoint, as is done by the current default scheduler. The awkwardness here comes from the fact that, in the AdaptiveScheduler, we first compute the parallelisms based on what the user has supplied to request resources, _which will set default max parallelisms if unset_. Then, we compute the parallelisms based on what resources are available. This change ensures that the initially computed max parallelism is _always_ the one that is used for the ExecutionGraph _and_ is able to be reset by the savepoint if it was auto-configured, which is not currently consistent with the default scheduler. If we were able to read the savepoint first, this would not be necessary and we remove the odd "state can reset max parallelism if auto-configured" behavior and just use the previously stored max parallelism when adjusting the Job graph for execution like we do currently: https://github.com/apache/flink/blob/0eed15de872768cd2ec4a7a3384e49f7bbaeb483/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L891-L902 Perhaps a better title might have been "Use a consistent default and state restore logic for max parallelism in the Adaptive Scheduler" > While it fixes the `RescalingITCase` , it also introduces the issue that we may use a smaller maxParallelism than we initially used for requesting resources, when the AdaptiveScheduler is used without reactive mode. So even if the max parallelism was set to 8 based on the savepoint information, the scheduler will still initially ask for 128+ slots, and hold on to them until the job terminates. > In light of that, I'm wondering if we just shouldn't support this behavior in the adaptive scheduler. > AFAICT, the reactive mode already doesn't support this anyway, because it actively checks that the maxParallelism extracted from the savepoint is not less than the default we came up with. I think this is the larger issue -- perhaps we disallow this behavior in the AdaptiveScheduler for now like you suggest, and then either: * add support for it when we move around the savepoint logic, or * remove support for this in the default scheduler as well > To remedy this we would need to reduce the max parallelism after the fact if this case occurs, but that will add further > complexity for a really weird edge-case. We could also add the invariant that is currently defined for reactive mode, where the savepoint can set the max parallelism as long it is at least what has been auto-configured, though I agree that this adds yet another bit of complexity to this tiny use case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
