Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/1668#issuecomment-186142276
  
    Thanks Paris, I like the idea, it's a correct modification of the original 
algorithm to make it much easier to implement on Flink for the price of 
buffering more records. 
    
    I have some comments to the implementation:
    
    - Why did you introduce the ForwardingOneInputStreamTask if only the 
Iteration tail will implement this behaviour? I am not sure if other 
StreamTasks will do this in the future, so it might be better to just put the 
logic in the StreamIterationTail instead of adding a new abstraction
    - I think the RecordPusher and UpstreamLogger embedded operators on the 
head and tail tasks are not really necessary and just add additional complexity 
to debugging and understanding the code. The Upstream logger only makes 
checkpointing "easier" for the implementer but we probably want to do some 
custom logic there anyways. So I would probably just overwrite the 
checkpointStatesInternal method directly.
    
    I agree that we definitely need to spill the buffered records to disk or 
use managed memory for this. This can be similar to the BarrierBuffer logic. We 
can combine this with checkpointing to an output stream to never have to 
materialize the full state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to