Github user ramkrish86 commented on the issue:

    https://github.com/apache/flink/pull/2495
  
    @StephanEwen 
    Just trying to explain what my intention was. This is particularly to 
address the sorters.
    Solution
    We could create memory allocators inside the batch task corresponding to 
every input. So every iteration task will have a memory allocator/creator 
corresponding to each input. It would create an array of memory segments that 
are required for the sorters corresponding to each input and when the task 
keeps looping in an iterative fashion we use the same set of memory segments 
created by this task.
    When we receive a termination event only then we do release the segments 
created by this task.
    This would ensure that we do create allocate memory per task and that is 
used through the life cycle of the iterative task.
    We change the Merge sorters (CombiningUnilateralSortMerger and 
UnilateralSortMerger) such that we pass the MemoryAllocator to it. When the 
sorters start doing the sorting, writing and use large records (if any) we pull 
in the memory segments allocated in the memory allocator. For iterative tasks, 
where we release the segments we just need to put back the segments to the 
memory allocator instead of releasing it back to the memory manager.
    When the task receives termination call only then we forcefully close the 
allocators so that all the created segments are released back to the memory 
manager.
    So even if preallocation of memory is set to true I think this would work 
and we won’t be requesting new segments from the MemoryManager’s pool and 
instead use the segments that were created initially for the first iteration.
    For a normal non-iterative tasks we know that the allocators are created 
for non-iterative tasks. We have a Boolean to indicate if it is an iterative 
task or not. Based on this flag, in the place where we try to release the 
segments we can decide if to release it back to the memory manager or put back 
to the memory allocator only (in case of iterative tasks).
    
    Pls note that I was able to fix the failed test cases that @ggevay  pointed 
out. But I have not updated the PR. I can wait for your feedback and thoughts 
and then proceed with both the PRs - this and #2510 .
    Points to note:
    Not sure whether this aligns with Steven's future vision of memory 
management.
    Impact on Streaming Iterative tasks
    Will the amount of memory segments needed for this task be dynamically 
changed? If so the above mechanism cannot work.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to