Tim,

If the operator is such that I can discard the state from the previous
checkpoint to the current checkpoint on the checkpointed callback then the
state can be reconstructed by the incoming data on replay and I don't need
to wait till committed to remove the un-wanted state. Its more efficient
memory wise.

Thanks

On Mon, Nov 23, 2015 at 11:17 AM, Timothy Farkas <[email protected]>
wrote:

> Pramod,
>
> Very good point about checkpointed being called within a window boundary.
> But I still don't understand the purpose of calling checkpointed after the
> asynchronous copy to HDFS is completed. It makes checkpointed
> implementations more complex, and it does not provide any benefits. The
> checkpointed implementation logic could be called multiple times for the
> same window if the operator fails and is restored to an earlier checkpoint,
> this fact invalidates any atomicity gaurantees that you are trying to
> preserve. I would like a concrete example of when it is necessary to call
> checkpointed after the asynchronous copy to hdfs. The only use case I can
> think of is if the operator reads its own checkpointed state from hdfs in
> the checkpointed call back, but this use case is not a valid use case.
>
> Tim
>
> On Mon, Nov 23, 2015 at 10:56 AM, Pramod Immaneni <[email protected]>
> wrote:
>
> > There is a problem I see with the older way in which checkpointed was
> > called as well which is relevant to this. It is called outside of window
> > boundary between end window and begin window which I don't think is the
> > correct as platform should not run operator business logic outside of
> > window boundaries. The operator could end up sending tuples accidentally
> in
> > this call for example.
> >
> > I think we need to fix that and also not assume that checkpointed
> callback
> > will be called immediately after end window of the window being
> > checkpointed.
> >
> > Thanks
> >
> > On Mon, Nov 23, 2015 at 10:38 AM, Chandni Singh <[email protected]
> >
> > wrote:
> >
> > > :-) I did not write it that semantics. Looks like someone made that
> > > assumption along the way. Fix the code which assumes that.
> > >
> > > What  do you mean?
> > > We go on fixing all the code which was written because the behavior
> with
> > > Synchronous checkpointing was
> > > beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when
> checkpointing
> > > is aligned with application window boundary)
> > >
> > > This was broken recently with async checkpointing but instead of fixing
> > > that, we go on fixing all the existing solutions.
> > > I guess I find this ridiculous.
> > >
> > > And why should we make it harder and harder to write operators?
> > > I think majority of people who participated in this discussion do
> believe
> > > that we need to fix and I think this is critical.
> > >
> > > Chandni
> > >
> > >
> > >
> > > On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
> > > [email protected]> wrote:
> > >
> > > >
> > > >
> > > > On 11/23/15, 8:44 AM, "Chandni Singh" <[email protected]>
> wrote:
> > > >
> > > > >I think in the API, there is windowId in the checkpointed callback
> for
> > > > >cases when checkpointing could happen within application windows.
> > > > >
> > > > >IMHO backward compatibility is broken. For 3 years since the
> platform
> > > was
> > > > >created the semantics of checkpointed were
> > > > >beginWindow(x) -> endwindow -> checkpointWindow(x) when
> checkpointing
> > > was
> > > > >aligned with application window boundaries.
> > > > >
> > > >
> > > > :-) I did not write it that semantics. Looks like someone made that
> > > > assumption along the way. Fix the code which assumes that.
> > > >
> > > > ‹
> > > > Chetan
> > > >
> > > > >Please not that aligning Checkpointing with Application Window
> > boundary
> > > is
> > > > >the DEFAUL behavior and I think most of us agree that aligning the
> > > > >checkpointing with application window boundaries is what we see in
> > every
> > > > >use case.
> > > > >
> > > > >Code was written with that assumption by committers of Apex (not
> even
> > by
> > > > >people who are new to Apex). It is broken by by a change introduced
> > > couple
> > > > >of months back (August 2015).
> > > > >
> > > > >Moreover, what do we achieve by NOT guaranteeing that semantics-
> > > delegate
> > > > >the complexity to operators to handle it. Even a simple scenario
> that
> > I
> > > > >mentioned in my first mail, is very complicated.
> > > > >
> > > > >I think this is a big issue and we need to fix it.
> > > > >
> > > > >Chandni
> > > > >
> > > > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <
> > [email protected]
> > > >
> > > > >wrote:
> > > > >
> > > > >> I too find Gaurav's argument cogent: endWindow() does not take a
> > > > >> windowId parameter
> > > > >> leading to a natural guarantee that it matches the immediately
> > > > >> preceding beginWindow().
> > > > >>
> > > > >> Both committed() and checkpointed() take that parameter which
> > suggests
> > > > >>it
> > > > >> may
> > > > >> lag behind the current window. The comments on those methods say
> > > nothing
> > > > >> that can be be interpreted as a guarantee that the windowId will
> > match
> > > > >>the
> > > > >> window just processed. So, from the viewpoint of "original intent"
> > --
> > > > >> a phrase that
> > > > >> has a long and storied history in jurisprudence
> > > > >> (https://en.wikipedia.org/wiki/Original_intent) --
> > > > >> it seems that any code that assumed a guarantee when none existed
> > must
> > > > >> be regarded
> > > > >> as erroneous.
> > > > >>
> > > > >> Having said that, we are all aware that in software it is not at
> all
> > > > >> unusual to preserve behavior
> > > > >> in the interests of backward compatibility, regardless of many
> other
> > > > >> reasons for that behavior
> > > > >> to be considered objectionable. So, if the cost of fixing all the
> > code
> > > > >> that makes that
> > > > >> assumption is too high, we should seriously consider reverting it.
> > In
> > > > >> this context, the
> > > > >> guarantee that checkpointed() simply means that the operator state
> > has
> > > > >> been serialized
> > > > >> seems adequate.
> > > > >>
> > > > >> We have a roughly analogous situation with respect to OS write()
> > > calls:
> > > > >> When the call returns, we _do not_ have an assurance that the data
> > has
> > > > >> gone to disk.
> > > > >> All we know is that the OS has copied the data to its buffers for
> > > > >> flushing to disk. If we
> > > > >> want to know when the actual write to disk is done, we need to
> use a
> > > > >> different call -- fsync().
> > > > >>
> > > > >> Ram
> > > > >>
> > > > >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
> > > > >><[email protected]>
> > > > >> wrote:
> > > > >> > I think the original intent for checkpointed callback was that
> it
> > > can
> > > > >>be
> > > > >> > called anytime after checkpoint and not necessarily immediately
> > > after
> > > > >>the
> > > > >> > window prior of checkpoint. As Gaurav mentioned the API bears it
> > > out.
> > > > >> > Furthermore the callback has to be called within the lifecycle
> > > > >>methods of
> > > > >> > the operator so it will be called inside a window so you anyway
> > have
> > > > >>to
> > > > >> > deal with new data anyway before the callback is called. Even
> > though
> > > > >> > checkpointed is not committed shouldn't it at least guarantee
> that
> > > the
> > > > >> > operator is recoverable to that state in which case it should be
> > > > >>called
> > > > >> > after the save to HDFS actually finishes.
> > > > >> >
> > > > >> > Thanks
> > > > >> >
> > > > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <
> > > [email protected]
> > > > >
> > > > >> > wrote:
> > > > >> >
> > > > >> >> IMO, I don¹t think there is any backward incompatibility wrt
> > > > >> Checkpointing
> > > > >> >> call back semantics because
> > > > >> >>
> > > > >> >> 1. The checkpointed call is only made once the operator state
> is
> > > > >> preserved.
> > > > >> >> 2. The window ids being passed to checkpointed are in
> increasing
> > > > >>order.
> > > > >> >> 3. The window ids being passed are still the same ids as were
> > > passed
> > > > >> >> earlier.
> > > > >> >> 4. The sequence is still
> > > begingWindow()->endWindow()->checkpointed().
> > > > >> >>
> > > > >> >> Thanks
> > > > >> >> - Gaurav
> > > > >> >>
> > > > >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <
> > > [email protected]>
> > > > >> >> wrote:
> > > > >> >> >
> > > > >> >> > If the requirement is that the order is always
> > > > >> >> begingWindow()->endWindow()->checkpointed(), why to pass
> windowId
> > > in
> > > > >>the
> > > > >> >> checkpointed() call back?
> > > > >> >> >
> > > > >> >> > Thanks
> > > > >> >> > - Gaurav
> > > > >> >> >
> > > > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
> > > > >><[email protected]
> > > > >> >> <mailto:[email protected]>> wrote:
> > > > >> >> >>
> > > > >> >> >> FYI,
> > > > >> >> >>
> > > > >> >> >> HDHTWriter implementation is dependent on the older
> semantics
> > > and
> > > > >> seems
> > > > >> >> to
> > > > >> >> >> be broken now.
> > > > >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> > > > >> >> >> In the checkpointed implementation, it copies certain state
> > > > >> (transient)
> > > > >> >> and
> > > > >> >> >> transfers it to a checkpointedWriteCache with respect to
> > window
> > > > >>'x'.
> > > > >> >> >>
> > > > >> >> >> With Async checkpointing it, the state that is transferred
> is
> > > much
> > > > >> more
> > > > >> >> >> recent than window 'x'.
> > > > >> >> >>
> > > > >> >> >> Chandni
> > > > >> >> >>
> > > > >> >> >>
> > > > >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> > > > >> >> [email protected] <mailto:[email protected]>>
> > > > >> >> >> wrote:
> > > > >> >> >>
> > > > >> >> >>> Agreed. Thomas's solution fixes the backward
> > incompatibility. I
> > > > >> think
> > > > >> >> we
> > > > >> >> >>> really need to fix this.
> > > > >> >> >>>
> > > > >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> > > > >> [email protected]
> > > > >> >> <mailto:[email protected]>>
> > > > >> >> >>> wrote:
> > > > >> >> >>>
> > > > >> >> >>>> Gaurav,
> > > > >> >> >>>>
> > > > >> >> >>>> I think if the state copy fails then STRAM should roll
> back
> > > the
> > > > >> >> operator
> > > > >> >> >>>> to
> > > > >> >> >>>> a checkpoint that is further back than the last
> checkpoint.
> > If
> > > > >>you
> > > > >> are
> > > > >> >> >>>> saying that you want to preserve the semantic that
> > > checkpointed
> > > > >>is
> > > > >> >> only
> > > > >> >> >>>> called after a checkpoint is completed, I would argue that
> > > that
> > > > >> >> guarantee
> > > > >> >> >>>> is already pointless in the current implementation since
> it
> > is
> > > > >> always
> > > > >> >> >>>> possible for an operator to be rolled back to a checkpoint
> > > > >>before
> > > > >> it's
> > > > >> >> >>>> last
> > > > >> >> >>>> completed checkpoint. So, it is already currently possible
> > for
> > > > >>some
> > > > >> >> >>>> database or file operation performed after a completed
> > > > >>checkpoint
> > > > >> to
> > > > >> >> be
> > > > >> >> >>>> redone after a failure. Because of this I think Thomas's
> > > > >>solution
> > > > >> >> makes
> > > > >> >> >>>> the
> > > > >> >> >>>> most sense. Thomas's solution would also address Chandni's
> > > > >>original
> > > > >> >> point
> > > > >> >> >>>> that the semantics for the checkpointed call back have
> been
> > > > >> violated.
> > > > >> >> >>>> There
> > > > >> >> >>>> are operators in our libraries that have depended on the
> > > > >> >> beginWindow(x),
> > > > >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which is
> > now
> > > > >> broken.
> > > > >> >> We
> > > > >> >> >>>> should probably fix that.
> > > > >> >> >>>>
> > > > >> >> >>>> Tim
> > > > >> >> >>>>
> > > > >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> > > > >> >> [email protected] <mailto:[email protected]>>
> > > > >> >> >>>> wrote:
> > > > >> >> >>>>
> > > > >> >> >>>>> Thomas,
> > > > >> >> >>>>>
> > > > >> >> >>>>> This was done to preserve checkpointing semantics that is
> > to
> > > > >>tell
> > > > >> the
> > > > >> >> >>>>> operator that its state is preserved. Say if database is
> > > > >>updated
> > > > >> or
> > > > >> >> >>>> files
> > > > >> >> >>>>> are moved in checkpointed call but the state copy fails,
> > how
> > > to
> > > > >> >> address
> > > > >> >> >>>>> such scenarios?
> > > > >> >> >>>>>
> > > > >> >> >>>>> Thanks
> > > > >> >> >>>>> - Gaurav
> > > > >> >> >>>>>
> > > > >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> > > > >> [email protected]
> > > > >> >> <mailto:[email protected]>>
> > > > >> >> >>>>> wrote:
> > > > >> >> >>>>>>
> > > > >> >> >>>>>> Alternatively I would ask why the checkpointed callback
> > > needs
> > > > >>to
> > > > >> >> wait
> > > > >> >> >>>>> until
> > > > >> >> >>>>>> the data was copied to HDFS instead upon completion of
> the
> > > > >>state
> > > > >> >> >>>>>> serialization.
> > > > >> >> >>>>>>
> > > > >> >> >>>>>> Thomas
> > > > >> >> >>>>>>
> > > > >> >> >>>>>>
> > > > >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> > > > >> >> >>>> [email protected] <mailto:[email protected]>>
> > > > >> >> >>>>>> wrote:
> > > > >> >> >>>>>>
> > > > >> >> >>>>>>> Gaurav,
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>> My question is about why Async was made the default
> when
> > it
> > > > >> changed
> > > > >> >> >>>> the
> > > > >> >> >>>>>>> semantics of operator callbacks. Your response doesn't
> > > answer
> > > > >> that.
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>> In a way we broke backward compatibility.
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>> Chandni
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> > > > >> >> >>>> [email protected] <mailto:[email protected]>>
> > > > >> >> >>>>>>> wrote:
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>>>> The idea behind Async checkpointing is to unblock
> > operator
> > > > >> while
> > > > >> >> the
> > > > >> >> >>>>>>> state
> > > > >> >> >>>>>>>> is getting transferred to HDFS.
> > > > >> >> >>>>>>>> Just to clarify that this beginWindow (x) ->
> > endWindow(x)
> > > ->
> > > > >> >> >>>>> checkpointed
> > > > >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS is
> > > slow
> > > > >>or
> > > > >> for
> > > > >> >> >>>> some
> > > > >> >> >>>>>>>> other reason transferring the state to HDFS is slow
> this
> > > > >> sequence
> > > > >> >> >>>> may
> > > > >> >> >>>>> not
> > > > >> >> >>>>>>>> hold true.
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>> Can your use case be addressed by
> > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > > >> >> https://malhar.atlassian.net/browse/APEX-78> <
> > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > > >> >> https://malhar.atlassian.net/browse/APEX-78>>?
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>> Thanks
> > > > >> >> >>>>>>>> - Gaurav
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> > > > >> >> >>>> [email protected] <mailto:[email protected]>>
> > > > >> >> >>>>>>>> wrote:
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> > > > >> >> CheckpointPoint
> > > > >> >> >>>>>>>>> listener is called for a previous window, that is,
> > > > >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed (x-1
> )
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> This feature was newly introduced. With synchronous
> > > > >> >> checkpointing,
> > > > >> >> >>>> the
> > > > >> >> >>>>>>>>> behavior was always
> > > > >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> A lot of operators were written before asynchronous
> > > > >> checkpointing
> > > > >> >> >>>> was
> > > > >> >> >>>>>>>>> introduced and few of them can rely on the sequencing
> > > > >> guaranteed
> > > > >> >> by
> > > > >> >> >>>>>>>>> synchronous checkpointing.
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> So why was Async Checkpointed made default?
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> With how Async checkpoint is today, the complexity to
> > > > >>handle
> > > > >> >> >>>> transient
> > > > >> >> >>>>>>>>> state in checkpointed callback falls on every
> operator.
> > > For
> > > > >> eg,
> > > > >> >> >>>> lets
> > > > >> >> >>>>>>> say
> > > > >> >> >>>>>>>>> earlier I had a transient map which I cleared every
> > time
> > > > >>the
> > > > >> >> >>>>>>> checkpointed
> > > > >> >> >>>>>>>>> was called, with async checkpointing this simple task
> > > will
> > > > >>be
> > > > >> a
> > > > >> >> lot
> > > > >> >> >>>>>>> more
> > > > >> >> >>>>>>>>> complicated.
> > > > >> >> >>>>>>>>>
> > > > >> >> >>>>>>>>> I think Async checkpointing broke the semantics of
> > > operator
> > > > >> >> >>>> callbacks
> > > > >> >> >>>>>>> and
> > > > >> >> >>>>>>>>> should NOT be the default.
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>>
> > > > >> >> >>>>>>>
> > > > >> >> >>>>>
> > > > >> >> >>>>>
> > > > >> >> >>>>
> > > > >> >> >>>
> > > > >> >> >>>
> > > > >> >> >
> > > > >> >>
> > > > >> >>
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to