Discarding state that was checkpointed is a good example how the callback
can be used. I don't see a reason to wait until the asynchronous copy to
HDFS has completed to implement this? In fact, with the guarantee that the
call occurs before the next beginWindow, it is easier to implement in the
operator, because there is no need to track which portion of state belongs
to which window.

Perhaps we can define new checkpointing related callbacks with actual use
cases in mind (now we have a few). Then we can extend the existing listener
interface so they remain optional for existing listener code but those
operators that require different semantics can take advantage of the
support.

Until we have this, how about we restore the previous behavior temporarily?
Calling checkpointed() immediately does not seem to pose any practical
issue but ensures that the code that was written under this assumption is
not broken.

Thomas





On Mon, Nov 23, 2015 at 12:12 PM, Pramod Immaneni <[email protected]>
wrote:

> Tim,
>
> If the operator is such that I can discard the state from the previous
> checkpoint to the current checkpoint on the checkpointed callback then the
> state can be reconstructed by the incoming data on replay and I don't need
> to wait till committed to remove the un-wanted state. Its more efficient
> memory wise.
>
> Thanks
>
> On Mon, Nov 23, 2015 at 11:17 AM, Timothy Farkas <[email protected]>
> wrote:
>
> > Pramod,
> >
> > Very good point about checkpointed being called within a window boundary.
> > But I still don't understand the purpose of calling checkpointed after
> the
> > asynchronous copy to HDFS is completed. It makes checkpointed
> > implementations more complex, and it does not provide any benefits. The
> > checkpointed implementation logic could be called multiple times for the
> > same window if the operator fails and is restored to an earlier
> checkpoint,
> > this fact invalidates any atomicity gaurantees that you are trying to
> > preserve. I would like a concrete example of when it is necessary to call
> > checkpointed after the asynchronous copy to hdfs. The only use case I can
> > think of is if the operator reads its own checkpointed state from hdfs in
> > the checkpointed call back, but this use case is not a valid use case.
> >
> > Tim
> >
> > On Mon, Nov 23, 2015 at 10:56 AM, Pramod Immaneni <
> [email protected]>
> > wrote:
> >
> > > There is a problem I see with the older way in which checkpointed was
> > > called as well which is relevant to this. It is called outside of
> window
> > > boundary between end window and begin window which I don't think is the
> > > correct as platform should not run operator business logic outside of
> > > window boundaries. The operator could end up sending tuples
> accidentally
> > in
> > > this call for example.
> > >
> > > I think we need to fix that and also not assume that checkpointed
> > callback
> > > will be called immediately after end window of the window being
> > > checkpointed.
> > >
> > > Thanks
> > >
> > > On Mon, Nov 23, 2015 at 10:38 AM, Chandni Singh <
> [email protected]
> > >
> > > wrote:
> > >
> > > > :-) I did not write it that semantics. Looks like someone made that
> > > > assumption along the way. Fix the code which assumes that.
> > > >
> > > > What  do you mean?
> > > > We go on fixing all the code which was written because the behavior
> > with
> > > > Synchronous checkpointing was
> > > > beginWindow(x) -> endWindow -> checkpointWindow(x)   ( when
> > checkpointing
> > > > is aligned with application window boundary)
> > > >
> > > > This was broken recently with async checkpointing but instead of
> fixing
> > > > that, we go on fixing all the existing solutions.
> > > > I guess I find this ridiculous.
> > > >
> > > > And why should we make it harder and harder to write operators?
> > > > I think majority of people who participated in this discussion do
> > believe
> > > > that we need to fix and I think this is critical.
> > > >
> > > > Chandni
> > > >
> > > >
> > > >
> > > > On Mon, Nov 23, 2015 at 10:16 AM, Chetan Narsude (cnarsude) <
> > > > [email protected]> wrote:
> > > >
> > > > >
> > > > >
> > > > > On 11/23/15, 8:44 AM, "Chandni Singh" <[email protected]>
> > wrote:
> > > > >
> > > > > >I think in the API, there is windowId in the checkpointed callback
> > for
> > > > > >cases when checkpointing could happen within application windows.
> > > > > >
> > > > > >IMHO backward compatibility is broken. For 3 years since the
> > platform
> > > > was
> > > > > >created the semantics of checkpointed were
> > > > > >beginWindow(x) -> endwindow -> checkpointWindow(x) when
> > checkpointing
> > > > was
> > > > > >aligned with application window boundaries.
> > > > > >
> > > > >
> > > > > :-) I did not write it that semantics. Looks like someone made that
> > > > > assumption along the way. Fix the code which assumes that.
> > > > >
> > > > > ‹
> > > > > Chetan
> > > > >
> > > > > >Please not that aligning Checkpointing with Application Window
> > > boundary
> > > > is
> > > > > >the DEFAUL behavior and I think most of us agree that aligning the
> > > > > >checkpointing with application window boundaries is what we see in
> > > every
> > > > > >use case.
> > > > > >
> > > > > >Code was written with that assumption by committers of Apex (not
> > even
> > > by
> > > > > >people who are new to Apex). It is broken by by a change
> introduced
> > > > couple
> > > > > >of months back (August 2015).
> > > > > >
> > > > > >Moreover, what do we achieve by NOT guaranteeing that semantics-
> > > > delegate
> > > > > >the complexity to operators to handle it. Even a simple scenario
> > that
> > > I
> > > > > >mentioned in my first mail, is very complicated.
> > > > > >
> > > > > >I think this is a big issue and we need to fix it.
> > > > > >
> > > > > >Chandni
> > > > > >
> > > > > >On Mon, Nov 23, 2015 at 7:12 AM, Munagala Ramanath <
> > > [email protected]
> > > > >
> > > > > >wrote:
> > > > > >
> > > > > >> I too find Gaurav's argument cogent: endWindow() does not take a
> > > > > >> windowId parameter
> > > > > >> leading to a natural guarantee that it matches the immediately
> > > > > >> preceding beginWindow().
> > > > > >>
> > > > > >> Both committed() and checkpointed() take that parameter which
> > > suggests
> > > > > >>it
> > > > > >> may
> > > > > >> lag behind the current window. The comments on those methods say
> > > > nothing
> > > > > >> that can be be interpreted as a guarantee that the windowId will
> > > match
> > > > > >>the
> > > > > >> window just processed. So, from the viewpoint of "original
> intent"
> > > --
> > > > > >> a phrase that
> > > > > >> has a long and storied history in jurisprudence
> > > > > >> (https://en.wikipedia.org/wiki/Original_intent) --
> > > > > >> it seems that any code that assumed a guarantee when none
> existed
> > > must
> > > > > >> be regarded
> > > > > >> as erroneous.
> > > > > >>
> > > > > >> Having said that, we are all aware that in software it is not at
> > all
> > > > > >> unusual to preserve behavior
> > > > > >> in the interests of backward compatibility, regardless of many
> > other
> > > > > >> reasons for that behavior
> > > > > >> to be considered objectionable. So, if the cost of fixing all
> the
> > > code
> > > > > >> that makes that
> > > > > >> assumption is too high, we should seriously consider reverting
> it.
> > > In
> > > > > >> this context, the
> > > > > >> guarantee that checkpointed() simply means that the operator
> state
> > > has
> > > > > >> been serialized
> > > > > >> seems adequate.
> > > > > >>
> > > > > >> We have a roughly analogous situation with respect to OS write()
> > > > calls:
> > > > > >> When the call returns, we _do not_ have an assurance that the
> data
> > > has
> > > > > >> gone to disk.
> > > > > >> All we know is that the OS has copied the data to its buffers
> for
> > > > > >> flushing to disk. If we
> > > > > >> want to know when the actual write to disk is done, we need to
> > use a
> > > > > >> different call -- fsync().
> > > > > >>
> > > > > >> Ram
> > > > > >>
> > > > > >> On Mon, Nov 23, 2015 at 6:19 AM, Pramod Immaneni
> > > > > >><[email protected]>
> > > > > >> wrote:
> > > > > >> > I think the original intent for checkpointed callback was that
> > it
> > > > can
> > > > > >>be
> > > > > >> > called anytime after checkpoint and not necessarily
> immediately
> > > > after
> > > > > >>the
> > > > > >> > window prior of checkpoint. As Gaurav mentioned the API bears
> it
> > > > out.
> > > > > >> > Furthermore the callback has to be called within the lifecycle
> > > > > >>methods of
> > > > > >> > the operator so it will be called inside a window so you
> anyway
> > > have
> > > > > >>to
> > > > > >> > deal with new data anyway before the callback is called. Even
> > > though
> > > > > >> > checkpointed is not committed shouldn't it at least guarantee
> > that
> > > > the
> > > > > >> > operator is recoverable to that state in which case it should
> be
> > > > > >>called
> > > > > >> > after the save to HDFS actually finishes.
> > > > > >> >
> > > > > >> > Thanks
> > > > > >> >
> > > > > >> > On Mon, Nov 23, 2015 at 5:38 AM, Gaurav Gupta <
> > > > [email protected]
> > > > > >
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> >> IMO, I don¹t think there is any backward incompatibility wrt
> > > > > >> Checkpointing
> > > > > >> >> call back semantics because
> > > > > >> >>
> > > > > >> >> 1. The checkpointed call is only made once the operator state
> > is
> > > > > >> preserved.
> > > > > >> >> 2. The window ids being passed to checkpointed are in
> > increasing
> > > > > >>order.
> > > > > >> >> 3. The window ids being passed are still the same ids as were
> > > > passed
> > > > > >> >> earlier.
> > > > > >> >> 4. The sequence is still
> > > > begingWindow()->endWindow()->checkpointed().
> > > > > >> >>
> > > > > >> >> Thanks
> > > > > >> >> - Gaurav
> > > > > >> >>
> > > > > >> >> > On Nov 23, 2015, at 1:03 AM, Gaurav Gupta <
> > > > [email protected]>
> > > > > >> >> wrote:
> > > > > >> >> >
> > > > > >> >> > If the requirement is that the order is always
> > > > > >> >> begingWindow()->endWindow()->checkpointed(), why to pass
> > windowId
> > > > in
> > > > > >>the
> > > > > >> >> checkpointed() call back?
> > > > > >> >> >
> > > > > >> >> > Thanks
> > > > > >> >> > - Gaurav
> > > > > >> >> >
> > > > > >> >> >> On Nov 22, 2015, at 11:22 PM, Chandni Singh
> > > > > >><[email protected]
> > > > > >> >> <mailto:[email protected]>> wrote:
> > > > > >> >> >>
> > > > > >> >> >> FYI,
> > > > > >> >> >>
> > > > > >> >> >> HDHTWriter implementation is dependent on the older
> > semantics
> > > > and
> > > > > >> seems
> > > > > >> >> to
> > > > > >> >> >> be broken now.
> > > > > >> >> >> startWindow(x) -> endWindow(x) -> checkpointed(x)
> > > > > >> >> >> In the checkpointed implementation, it copies certain
> state
> > > > > >> (transient)
> > > > > >> >> and
> > > > > >> >> >> transfers it to a checkpointedWriteCache with respect to
> > > window
> > > > > >>'x'.
> > > > > >> >> >>
> > > > > >> >> >> With Async checkpointing it, the state that is transferred
> > is
> > > > much
> > > > > >> more
> > > > > >> >> >> recent than window 'x'.
> > > > > >> >> >>
> > > > > >> >> >> Chandni
> > > > > >> >> >>
> > > > > >> >> >>
> > > > > >> >> >> On Sun, Nov 22, 2015 at 11:04 PM, Chandni Singh <
> > > > > >> >> [email protected] <mailto:[email protected]>>
> > > > > >> >> >> wrote:
> > > > > >> >> >>
> > > > > >> >> >>> Agreed. Thomas's solution fixes the backward
> > > incompatibility. I
> > > > > >> think
> > > > > >> >> we
> > > > > >> >> >>> really need to fix this.
> > > > > >> >> >>>
> > > > > >> >> >>> On Sun, Nov 22, 2015 at 10:23 PM, Timothy Farkas <
> > > > > >> [email protected]
> > > > > >> >> <mailto:[email protected]>>
> > > > > >> >> >>> wrote:
> > > > > >> >> >>>
> > > > > >> >> >>>> Gaurav,
> > > > > >> >> >>>>
> > > > > >> >> >>>> I think if the state copy fails then STRAM should roll
> > back
> > > > the
> > > > > >> >> operator
> > > > > >> >> >>>> to
> > > > > >> >> >>>> a checkpoint that is further back than the last
> > checkpoint.
> > > If
> > > > > >>you
> > > > > >> are
> > > > > >> >> >>>> saying that you want to preserve the semantic that
> > > > checkpointed
> > > > > >>is
> > > > > >> >> only
> > > > > >> >> >>>> called after a checkpoint is completed, I would argue
> that
> > > > that
> > > > > >> >> guarantee
> > > > > >> >> >>>> is already pointless in the current implementation since
> > it
> > > is
> > > > > >> always
> > > > > >> >> >>>> possible for an operator to be rolled back to a
> checkpoint
> > > > > >>before
> > > > > >> it's
> > > > > >> >> >>>> last
> > > > > >> >> >>>> completed checkpoint. So, it is already currently
> possible
> > > for
> > > > > >>some
> > > > > >> >> >>>> database or file operation performed after a completed
> > > > > >>checkpoint
> > > > > >> to
> > > > > >> >> be
> > > > > >> >> >>>> redone after a failure. Because of this I think Thomas's
> > > > > >>solution
> > > > > >> >> makes
> > > > > >> >> >>>> the
> > > > > >> >> >>>> most sense. Thomas's solution would also address
> Chandni's
> > > > > >>original
> > > > > >> >> point
> > > > > >> >> >>>> that the semantics for the checkpointed call back have
> > been
> > > > > >> violated.
> > > > > >> >> >>>> There
> > > > > >> >> >>>> are operators in our libraries that have depended on the
> > > > > >> >> beginWindow(x),
> > > > > >> >> >>>> endWindow(x), and checkpointed(x) call sequence, which
> is
> > > now
> > > > > >> broken.
> > > > > >> >> We
> > > > > >> >> >>>> should probably fix that.
> > > > > >> >> >>>>
> > > > > >> >> >>>> Tim
> > > > > >> >> >>>>
> > > > > >> >> >>>> On Sun, Nov 22, 2015 at 10:02 PM, Gaurav Gupta <
> > > > > >> >> [email protected] <mailto:[email protected]>>
> > > > > >> >> >>>> wrote:
> > > > > >> >> >>>>
> > > > > >> >> >>>>> Thomas,
> > > > > >> >> >>>>>
> > > > > >> >> >>>>> This was done to preserve checkpointing semantics that
> is
> > > to
> > > > > >>tell
> > > > > >> the
> > > > > >> >> >>>>> operator that its state is preserved. Say if database
> is
> > > > > >>updated
> > > > > >> or
> > > > > >> >> >>>> files
> > > > > >> >> >>>>> are moved in checkpointed call but the state copy
> fails,
> > > how
> > > > to
> > > > > >> >> address
> > > > > >> >> >>>>> such scenarios?
> > > > > >> >> >>>>>
> > > > > >> >> >>>>> Thanks
> > > > > >> >> >>>>> - Gaurav
> > > > > >> >> >>>>>
> > > > > >> >> >>>>>> On Nov 22, 2015, at 9:44 PM, Thomas Weise <
> > > > > >> [email protected]
> > > > > >> >> <mailto:[email protected]>>
> > > > > >> >> >>>>> wrote:
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> Alternatively I would ask why the checkpointed
> callback
> > > > needs
> > > > > >>to
> > > > > >> >> wait
> > > > > >> >> >>>>> until
> > > > > >> >> >>>>>> the data was copied to HDFS instead upon completion of
> > the
> > > > > >>state
> > > > > >> >> >>>>>> serialization.
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> Thomas
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>> On Sun, Nov 22, 2015 at 9:41 PM, Chandni Singh <
> > > > > >> >> >>>> [email protected] <mailto:[email protected]
> >>
> > > > > >> >> >>>>>> wrote:
> > > > > >> >> >>>>>>
> > > > > >> >> >>>>>>> Gaurav,
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> My question is about why Async was made the default
> > when
> > > it
> > > > > >> changed
> > > > > >> >> >>>> the
> > > > > >> >> >>>>>>> semantics of operator callbacks. Your response
> doesn't
> > > > answer
> > > > > >> that.
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> In a way we broke backward compatibility.
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> Chandni
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>> On Sun, Nov 22, 2015 at 9:22 PM, Gaurav Gupta <
> > > > > >> >> >>>> [email protected] <mailto:[email protected]>>
> > > > > >> >> >>>>>>> wrote:
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>>>> The idea behind Async checkpointing is to unblock
> > > operator
> > > > > >> while
> > > > > >> >> the
> > > > > >> >> >>>>>>> state
> > > > > >> >> >>>>>>>> is getting transferred to HDFS.
> > > > > >> >> >>>>>>>> Just to clarify that this beginWindow (x) ->
> > > endWindow(x)
> > > > ->
> > > > > >> >> >>>>> checkpointed
> > > > > >> >> >>>>>>>> (x-1 ) should be an ideal sequence, but if the HDFS
> is
> > > > slow
> > > > > >>or
> > > > > >> for
> > > > > >> >> >>>> some
> > > > > >> >> >>>>>>>> other reason transferring the state to HDFS is slow
> > this
> > > > > >> sequence
> > > > > >> >> >>>> may
> > > > > >> >> >>>>> not
> > > > > >> >> >>>>>>>> hold true.
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> Can your use case be addressed by
> > > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > > > >> >> https://malhar.atlassian.net/browse/APEX-78> <
> > > > > >> >> >>>>>>>> https://malhar.atlassian.net/browse/APEX-78 <
> > > > > >> >> https://malhar.atlassian.net/browse/APEX-78>>?
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>> Thanks
> > > > > >> >> >>>>>>>> - Gaurav
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>>> On Nov 22, 2015, at 3:56 PM, Chandni Singh <
> > > > > >> >> >>>> [email protected] <mailto:[email protected]
> >>
> > > > > >> >> >>>>>>>> wrote:
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> With Async checkpointing the checkpoint callback in
> > > > > >> >> CheckpointPoint
> > > > > >> >> >>>>>>>>> listener is called for a previous window, that is,
> > > > > >> >> >>>>>>>>> beginWindow (x) -> endWindow(x) -> checkpointed
> (x-1
> > )
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> This feature was newly introduced. With synchronous
> > > > > >> >> checkpointing,
> > > > > >> >> >>>> the
> > > > > >> >> >>>>>>>>> behavior was always
> > > > > >> >> >>>>>>>>> beginWindow(x) -> endWindow(x) -> checkpointed (x)
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> A lot of operators were written before asynchronous
> > > > > >> checkpointing
> > > > > >> >> >>>> was
> > > > > >> >> >>>>>>>>> introduced and few of them can rely on the
> sequencing
> > > > > >> guaranteed
> > > > > >> >> by
> > > > > >> >> >>>>>>>>> synchronous checkpointing.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> So why was Async Checkpointed made default?
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> With how Async checkpoint is today, the complexity
> to
> > > > > >>handle
> > > > > >> >> >>>> transient
> > > > > >> >> >>>>>>>>> state in checkpointed callback falls on every
> > operator.
> > > > For
> > > > > >> eg,
> > > > > >> >> >>>> lets
> > > > > >> >> >>>>>>> say
> > > > > >> >> >>>>>>>>> earlier I had a transient map which I cleared every
> > > time
> > > > > >>the
> > > > > >> >> >>>>>>> checkpointed
> > > > > >> >> >>>>>>>>> was called, with async checkpointing this simple
> task
> > > > will
> > > > > >>be
> > > > > >> a
> > > > > >> >> lot
> > > > > >> >> >>>>>>> more
> > > > > >> >> >>>>>>>>> complicated.
> > > > > >> >> >>>>>>>>>
> > > > > >> >> >>>>>>>>> I think Async checkpointing broke the semantics of
> > > > operator
> > > > > >> >> >>>> callbacks
> > > > > >> >> >>>>>>> and
> > > > > >> >> >>>>>>>>> should NOT be the default.
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>>
> > > > > >> >> >>>>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>>
> > > > > >> >> >>>>
> > > > > >> >> >>>
> > > > > >> >> >>>
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to