[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15505669#comment-15505669
 ] 

ASF GitHub Bot commented on FLINK-3322:
---------------------------------------

Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79539655
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java 
---
    @@ -128,17 +130,22 @@ public void prepare() throws Exception{
                                
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
     
                // create and return joining iterator according to provided 
local strategy.
    -           if (objectReuseEnabled) {
    -                   switch (ls) {
    -                           case INNER_MERGE:
    -                                   this.joinIterator = new 
ReusingMergeInnerJoinIterator<>(in1, in2, 
    +           if (reset) {
    +                   resetForIterativeTasks(in1, in2, serializer1, 
serializer2, comparator1, comparator2, pairComparatorFactory);
    +                   reset = false;
    +           }
    +           if (joinIterator == null) {
    --- End diff --
    
    @ggevay 
    I can now say why I went with the `reset` boolean. In case of 
`PageRankITCase#testPageRankSmallNumberOfIterations' the way the input is 
closed and opened again - things does not work if we allow the iterators to be 
reset in the `reset' method. Because the input is referring to the older input 
iterators. Instead of it is done in the `prepare` it works fine. I can see that 
this eg uses the TempBarriers and hence the change in behaviour.
    Hence when i went with a boolean way I could atleast show that reset is 
having some impl and those drivers work on the action based on 'reset'. 
    That is why if you see the impl of (for eg) 
`JoinWithSolutionSetFirstDriver` the reset() is left empty and also the 
updation of the inputs happen in the run() method. IMHO  having `reset()`, 
`prepare()` and `initialize()` is bit confusing and needs to be refactored. 


> MemoryManager creates too much GC pressure with iterative jobs
> --------------------------------------------------------------
>
>                 Key: FLINK-3322
>                 URL: https://issues.apache.org/jira/browse/FLINK-3322
>             Project: Flink
>          Issue Type: Bug
>          Components: Local Runtime
>    Affects Versions: 1.0.0
>            Reporter: Gabor Gevay
>            Assignee: ramkrishna.s.vasudevan
>            Priority: Critical
>             Fix For: 1.0.0
>
>         Attachments: FLINK-3322.docx, FLINK-3322_reusingmemoryfordrivers.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to