Maximilian Michels created FLINK-34152:
------------------------------------------
Summary: Tune memory of autoscaled jobs
Key: FLINK-34152
URL: https://issues.apache.org/jira/browse/FLINK-34152
Project: Flink
Issue Type: New Feature
Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Fix For: kubernetes-operator-1.8.0
The current autoscaling algorithm adjusts the parallelism of the job task
vertices according to the processing needs. By adjusting the parallelism, we
systematically scale the amount of CPU for a task. At the same time, we also
indirectly change the amount of memory tasks have at their dispense. However,
there are some problems with this.
# Memory is overprovisioned: On scale up we may add more memory then we
actually need. Even on scale down, the memory / cpu ratio can still be off and
too much memory is used.
# Memory is underprovisioned: For stateful jobs, we risk running into
OutOfMemoryErrors on scale down. Even before running out of memory, too little
memory can have a negative impact on the effectiveness of the scaling.
We lack the capability to tune memory proportionally to the processing needs.
In the same way that we measure CPU usage and size the tasks accordingly, we
need to evaluate memory usage and adjust the heap memory size.
A tuning algorithm could look like this:
h2. 1. Establish a memory baseline
We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record
The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much.
The max memory per TM should be equal to the initially defined user-specified
limit from the ResourceSpec.
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)