Github user gyfora commented on the pull request:

    https://github.com/apache/flink/pull/747#issuecomment-113890106
  
    Thanks for the nice feedback Stephan it makes a lot of sense what you have 
written :+1: 
    
    Let me quickly react to these points separately:
    
    ### Current State Interface
    I am okay with keeping the current Checkpointed interfaces, but I wonder if 
they are equivalent with the simple non-partitioned state abstractions. 
    
    Anyways, I want to do some refactor to make partitioned/non-partitioned 
states more transparent on the runtime and allow mixing them. This will 
probably mean that I will add a boolean flag to the getOperatorState(...) 
method to get either partitioned or non-partitioned state (partitioned would 
throw exception if it is not a keyed stream).
    
    ### State Backing / Checkpointing & Restoring
    This is a very good idea, I like it :) I think a slight refactor would 
allow us to specify any action to be taken during a snapshot, which would 
either return the handles and store it right now, or return null after doing 
the proper interaction with the external store. I  think it is safe to say that 
this could be an extension do be added later. 
    
    I see some immediate issues with the group commit logic to the external 
store. I wonder when we would actually do the commit, because the fact that the 
operator takes a snapshot doesnt mean that the checkpoint will be completed so 
we might need to roll-back. This assumes that we can roll back on the external 
store as well, and that is a strong assumption. Another way would be to commit 
the changes with the checkpoint committer interface (like in the KafkaSource) 
but then if we fail before committing we end up with a corrupted snapshot (the 
system thinks its complete but in reality the external storage doesnt contain 
the changes so no way to restore).
    
    ### API integration
    I agree completely. We need to allow mixing partitioned/nonpartitioned 
states as I said in the first point.
    
    ### Asynchronous state checkpoints
    Agreed.
    
    ### Incremental state checkpoints
    Agreed, and I even think maybe we can support per-key incremental snapshots 
if that is absolutely necessary (such as windowbuffers). This could be an 
interface which incrementally computes the delta after each update(...) call. 
This might be tricky though but should be possible.
    
    ### Shared replicated state
    This would be very cool indeed :)
    
    
    I will start doing some refactoring to make it possible to incorporate the 
suggested changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to