ASF GitHub Bot commented on FLINK-3322:

Github user ramkrish86 commented on the issue:

    Just trying to explain what my intention was. This is particularly to 
address the sorters.
    We could create memory allocators inside the batch task corresponding to 
every input. So every iteration task will have a memory allocator/creator 
corresponding to each input. It would create an array of memory segments that 
are required for the sorters corresponding to each input and when the task 
keeps looping in an iterative fashion we use the same set of memory segments 
created by this task.
    When we receive a termination event only then we do release the segments 
created by this task.
    This would ensure that we do create allocate memory per task and that is 
used through the life cycle of the iterative task.
    We change the Merge sorters (CombiningUnilateralSortMerger and 
UnilateralSortMerger) such that we pass the MemoryAllocator to it. When the 
sorters start doing the sorting, writing and use large records (if any) we pull 
in the memory segments allocated in the memory allocator. For iterative tasks, 
where we release the segments we just need to put back the segments to the 
memory allocator instead of releasing it back to the memory manager.
    When the task receives termination call only then we forcefully close the 
allocators so that all the created segments are released back to the memory 
    So even if preallocation of memory is set to true I think this would work 
and we won’t be requesting new segments from the MemoryManager’s pool and 
instead use the segments that were created initially for the first iteration.
    For a normal non-iterative tasks we know that the allocators are created 
for non-iterative tasks. We have a Boolean to indicate if it is an iterative 
task or not. Based on this flag, in the place where we try to release the 
segments we can decide if to release it back to the memory manager or put back 
to the memory allocator only (in case of iterative tasks).
    Pls note that I was able to fix the failed test cases that @ggevay  pointed 
out. But I have not updated the PR. I can wait for your feedback and thoughts 
and then proceed with both the PRs - this and #2510 .
    Points to note:
    Not sure whether this aligns with Steven's future vision of memory 
    Impact on Streaming Iterative tasks
    Will the amount of memory segments needed for this task be dynamically 
changed? If so the above mechanism cannot work.

> MemoryManager creates too much GC pressure with iterative jobs
> --------------------------------------------------------------
>                 Key: FLINK-3322
>                 URL: https://issues.apache.org/jira/browse/FLINK-3322
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.0.0
>            Reporter: Gabor Gevay
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.0.0
>         Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)

This message was sent by Atlassian JIRA

Reply via email to