Allen Wang created FLINK-32554: ---------------------------------- Summary: Facilitate slot isolation and resource management for global committer Key: FLINK-32554 URL: https://issues.apache.org/jira/browse/FLINK-32554 Project: Flink Issue Type: Improvement Affects Versions: 1.16.2 Reporter: Allen Wang
Flink's global committer executes unique workload compared to the source and sink operators. In some use cases, it may require much higher amount of resources (CPU, memory) than other operators. However, according to this [source code|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java], currently it is not possible to isolate the global committer to a dedicated task manager or task slot, or assign more resources to it by leveraging the fine grained resource management. Flink would always make the global committer task share with another task in a task slot. (In one test, we tried to have one more task slot than required by the source/sink parallelism, but Flink still assigns the global committer to share a slot with another task.) As a result, we often see CPU utilization spike on the task manger that runs the global committer compared with other task managers and becomes the bottleneck for the job. Due to slot sharing and inadequate resources on the global committer, the job takes long time to initialize upon restarting and the checkpoints take long time to complete. Our job consumes from Kafka and this bottleneck causes significant increase of consumer lag. The lag in turn causes the Kafka source operator to replay backlogs, causing more CPU consumption on the source operator and making it worse for the global committer that runs in the same task slot. At minimum, we want the capability to configure the global committer to run in its own task slot, and make that work under reactive scaling. It would also be great to make the fine grained resource management working for global committer. -- This message was sent by Atlassian Jira (v8.20.10#820010)