Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2510#discussion_r79539655
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java 
---
    @@ -128,17 +130,22 @@ public void prepare() throws Exception{
                                
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
     
                // create and return joining iterator according to provided 
local strategy.
    -           if (objectReuseEnabled) {
    -                   switch (ls) {
    -                           case INNER_MERGE:
    -                                   this.joinIterator = new 
ReusingMergeInnerJoinIterator<>(in1, in2, 
    +           if (reset) {
    +                   resetForIterativeTasks(in1, in2, serializer1, 
serializer2, comparator1, comparator2, pairComparatorFactory);
    +                   reset = false;
    +           }
    +           if (joinIterator == null) {
    --- End diff --
    
    @ggevay 
    I can now say why I went with the `reset` boolean. In case of 
`PageRankITCase#testPageRankSmallNumberOfIterations' the way the input is 
closed and opened again - things does not work if we allow the iterators to be 
reset in the `reset' method. Because the input is referring to the older input 
iterators. Instead of it is done in the `prepare` it works fine. I can see that 
this eg uses the TempBarriers and hence the change in behaviour.
    Hence when i went with a boolean way I could atleast show that reset is 
having some impl and those drivers work on the action based on 'reset'. 
    That is why if you see the impl of (for eg) 
`JoinWithSolutionSetFirstDriver` the reset() is left empty and also the 
updation of the inputs happen in the run() method. IMHO  having `reset()`, 
`prepare()` and `initialize()` is bit confusing and needs to be refactored. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to