Github user gyfora commented on the pull request:
https://github.com/apache/flink/pull/1668#issuecomment-186142276
Thanks Paris, I like the idea, it's a correct modification of the original
algorithm to make it much easier to implement on Flink for the price of
buffering more records.
I have some comments to the implementation:
- Why did you introduce the ForwardingOneInputStreamTask if only the
Iteration tail will implement this behaviour? I am not sure if other
StreamTasks will do this in the future, so it might be better to just put the
logic in the StreamIterationTail instead of adding a new abstraction
- I think the RecordPusher and UpstreamLogger embedded operators on the
head and tail tasks are not really necessary and just add additional complexity
to debugging and understanding the code. The Upstream logger only makes
checkpointing "easier" for the implementer but we probably want to do some
custom logic there anyways. So I would probably just overwrite the
checkpointStatesInternal method directly.
I agree that we definitely need to spill the buffered records to disk or
use managed memory for this. This can be similar to the BarrierBuffer logic. We
can combine this with checkpointing to an output stream to never have to
materialize the full state.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---