Actually, this is the first rework of the state interface. There is only
one released version, yet.
What we are doing here is trying to make sure that this first rework will
most likely also be the last for the foreseeable future.


>From the use cases I can think of, we need at least two different state
checkpointing methods:


1) The variant where state is abstracted as a key/value interface. This is
the new partitionable state interface.
    Backuped is exactly what you put into the state. No need to make the
operator aware about when checkpoint happen.
    Most simple applications should be able to work against this interface.


2) The variant where the user code gets a call onCheckpoint() (currently
snapshotState()) and returns whatever it wants to be persisted. This is
important if the streaming
    flow interacts with outside systems and wants to "groupCommit" data on
checkpoints.

    The crucial thing here is that the value-to-be persisted by Flink may
be in some cases not the actual data - that one has been periodically
inserted into the external system.
    The checkpointed value is only a key, epoch counter, or transaction ID
that allows you mark what has been inserted into the external system as
part of that checkpoint.

    This call to "onCheckpoint()" is not best-effort, but crucial and needs
to succeed if a checkpoint is to be successful. Best effort is only
"notifyCompleteCheckpoint()".
    And we could make this message "at-least-once", if that is needed for
reliable interaction with the outside world.

    In the last Flink meetup in the Bay Area, we had quite a discussion
with some people about how interface (2) is powerful when trying to get
"exactly-once" with external systems.

    Also, with this interface, it is quite straightforward to make
asynchronous snapshotting possible, and it can be extended to incremental
snapshotting. It is not obvious to me
    how the same should work on the annotation variant.


Concerning the annotated state:

That is eye candy and nice. Would it hurt to have this and promote it as a
"shortcut" to a state backup implementation using (2), where the
"shapshotState" method would simply
return the value of some fields?

I know we should not offer too many different ways of doing things, but if
we promote (2) as "2-general" (interface) and "2-shortcut" (annotation), I
see no problem.


Greetings,
Stephan




On Wed, Jul 1, 2015 at 11:59 AM, Robert Metzger <rmetz...@apache.org> wrote:

> I agree, if we want to change the interface, now is the best time.
>
> So you are suggesting to change the methods in the Checkpointed interface
> from
>
> T snapshotState(long checkpointId, long checkpointTimestamp) throws
> Exception;
>
> void restoreState(T state);
>
> to
>
> void onSnapshot(id, ts)
> void onRestore(id, ts)
> (+ user has to annotate checkpointed fields)
>
> I would say that the current interface is more powerful than what you
> are proposing (arguments will follow)
> I don't think that there is an advantage in usability for the user
> with the new methods (but that is a matter of taste ... )
>
> I think that the current interface is more powerful because it allows
> you to give the system a derived state to back up, instead of just the
> value of a variable. You would need to always update the derived state
> so that the system can back it up when it needs to.
> With the method, you can do this set only on demand.
> For the restore method, with the old interface, you can do sanity
> checks on the state to restore (something the only user of these
> interfaces (the kafka source) is actually doing). With your proposed
> interface, I would need to validate data from a field.
> The proposed restore method would also make it harder to restore from
> a derived state.
>
>
> On Wed, Jul 1, 2015 at 11:38 AM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>
> > I understand your concerns Robert but I don't fully agree.
> >
> > The Checkpointed interface works indeed but there are so many use cases
> > that it is not suitable for in the long run, and also the whole interface
> > is slightly awkward in my opinion when returning simple fields which are
> > already serializable.
> >
> > This motivated the introduction of the OperatorStateInterface which you
> can
> > call the first rework of the checkpointed interface, but I see that as
> the
> > first version which is actually capable of handling many issues that were
> > obvious with the Checkpointed interfaces.
> >
> > This is actually not only a rework of the interface but the rework of the
> > state concept and runtime handling. This needs to be clean if we are
> moving
> > streaming out of beta, and should provide the needed funcionality. I
> think
> > we can afford to experiment around a little bit with these interfaces and
> > see the implications for the applications that we can develop with them
> as
> > we think of statefulness as a major advantage of Flink streaming.
> >
> > So actually I think this is the only time when we can afford rework these
> > interfaces without big costs to make it work for the future.
> >
> >
> >
> > Robert Metzger <rmetz...@apache.org> ezt írta (időpont: 2015. júl. 1.,
> > Sze,
> > 11:25):
> >
> > > Whats causing me the biggest headache here is that I don't see an end
> on
> > > all these "state interface" reworks.
> > > I think this is now the third big change to the interface.
> > > It is a horrible user experience to rework your old code with each new
> > > Flink release.
> > >
> > > I understand that there are always ways to improve interfaces, and I'm
> > sure
> > > Flink has many that we can improve.
> > > But there are (in my opinion) more important things than reworking the
> > > interfaces every second week ... for example that the functionality
> they
> > > are providing is actually working and well tested.
> > >
> > >
> > >
> > > On Wed, Jul 1, 2015 at 11:15 AM, Ufuk Celebi <u...@apache.org> wrote:
> > >
> > > >
> > > > On 01 Jul 2015, at 10:57, Gyula Fóra <gyula.f...@gmail.com> wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > Thanks for the feedback guys:
> > > > >
> > > > > @Max: You are right, this is not top priority to changes, I was
> just
> > > > > mocking up some alternatives to try to make the state usage even
> > > simpler
> > > > so
> > > > > that the user can keep his current implementations and just add 1-2
> > > > > annotations.
> > > >
> > > > I agree. It's good to cover the "basic" case with a simple solution.
> > :-)
> > > >
> > > > > @Stephan, Robert: You are right that the checkpointed interface has
> > > some
> > > > > advantages from that point of view. Maybe a way to go would be to
> > > > separate
> > > > > this signaling functionality (when the checkpoint is taken and
> maybe
> > > also
> > > > > the commits) from the snapshotting itself. One advantage I see
> there
> > is
> > > > > that we would not need to have 3 different interfaces doing pretty
> > much
> > > > the
> > > > > same thing (OperatorState - needed for partitioned state and
> > different
> > > > > backends/out-of-core, Checkpointed - needed for special actions
> after
> > > > > checkpoints, Annotations - checkpointing simple fields natively).
> > > >
> > > > I also agree with Stephan and Robert that there are other use cases,
> > > which
> > > > require the interfaces. I cannot judge your proposal at this point
> > > though.
> > > > I'm eager to hear what the others say who worked on this.
> > > >
> > > > – Ufuk
> > >
> >
>

Reply via email to