What you are looking for would require the guarantee for checkpointed to be
called before next beginWindow.

Why don't you record the list in endWindow? Is it a performance concern?

This would then benefit from https://malhar.atlassian.net/browse/APEX-78



On Tue, Nov 10, 2015 at 11:20 PM, Bhupesh Chawda <[email protected]>
wrote:

> I was trying to do some processing between two checkpoint windows.
>
> So, for example, an operator deletes some files from hdfs as part of normal
> processing within an application window. However, since hdfs deletes cannot
> be rolled back by the platform, I am trying NOT to delete the files from
> hdfs but just record them in a list. Now when a checkpoint happens, I need
> to take a snapshot of the list *as of the checkpoint* and later delete them
> asynchronously. This process of taking the snapshot is what I need to do
> between two checkpoint windows. This can be done anytime between the
> endWindow just before the checkpoint and beginWindow just after the
> checkpoint.
>
> So the question is, can this be done in the checkpointed callback?
>
> If the callback is asynchronous, i. e. It may be called after the next
> window starts, then probably checkpointed is not the right place to do it.
> On 11-Nov-2015 11:20 am, "Thomas Weise" <[email protected]> wrote:
>
> > There does not seem to be a problem with the order of the checkpointed
> > callbacks.
> >
> > I would however question why the callback is delayed until write to
> storage
> > is complete. From operator perspective, it can be called as soon as
> > serialization is complete, which is always synchronous.
> >
> > The checkpoint cannot be reported to StrAM until the file copy is done,
> as
> > currently implemented.
> >
> > Bhupesh, what problem were you looking to solve through the checkpointed
> > callback?
> >
> >
> >
> >
> > On Tue, Nov 10, 2015 at 3:57 PM, Chetan Narsude <[email protected]>
> wrote:
> >
> > > With async checkpointing the Node.reportStats is reporting it back and
> it
> > > looks like the code is checking for the top of the queue to see if it’s
> > > done (or not reporting at all). So I do not see a reason why they will
> be
> > > reported out of order.
> > >
> > > Chandni, I read your subsequent responses. Valid point in the last
> email
> > > about documentation. It amazes me how much the documentation of our API
> > > can be improved (self guilty probably the most). One comment that I
> > wanted
> > > to make even before that is that the semantics of copyToHdfs is that it
> > > either succeeds or throws exception in which case the recovery kicks in
> > > and nothing is reported as checkpointed.
> > >
> > > Tim, we need the unit test, man!
> > >
> > > —
> > > Chetan
> > >
> > >
> > > On 11/10/15, 3:05 PM, "Chetan Narsude (cnarsude)" <[email protected]>
> > > wrote:
> > >
> > > >There are a lot of things which are different when it comes to async
> > > >checkpointing. I was evaluating it in the morning and expect that
> > either I
> > > >am able to explain or open jira issues. With my partial observation is
> > > >that with Async checkpointing, checkpointed is not issued (chandni,
> the
> > > >last statement in the if block is ³return²). I am digging into it but
> > feel
> > > >free to chime in if someone else is able to find that.
> > > >
> > > >Also I realized that my morning email applies as it is to committed
> but
> > > >checkpointed has deviated a little bit from that. Will post the
> revised
> > > >response soon.
> > > >
> > > >‹
> > > >Chetan
> > > >
> > > >
> > > >
> > > >
> > > >On 11/10/15, 2:04 PM, "Chandni Singh" <[email protected]>
> wrote:
> > > >
> > > >>Chetan,
> > > >>
> > > >>Looking at the checkpoint(windowId) in Node.java, I don't think the
> > steps
> > > >>you mentioned are followed.
> > > >>
> > > >>*if (using AsyncFSStorageAgent)  {*
> > > >>*  asyncFSStorageAgent.copyToHdfs(...)*
> > > >>*}*
> > > >>*operator.checkpointed(windowId);*
> > > >>
> > > >>This means even copyToHdfs fails the operator is notified that the
> > window
> > > >>is check-pointed.
> > > >>
> > > >>Are we saying that copyToHdfs will never fail with
> AsyncFSStorageAgent
> > > >>for
> > > >>a window since the operator is notified that the window is
> > checkpointed?
> > > >>
> > > >>Chandni
> > > >>
> > > >>On Tue, Nov 10, 2015 at 11:33 AM, Timothy Farkas <
> [email protected]>
> > > >>wrote:
> > > >>
> > > >>> Will do
> > > >>>
> > > >>> On Tue, Nov 10, 2015 at 11:01 AM, Pramod Immaneni
> > > >>><[email protected]>
> > > >>> wrote:
> > > >>>
> > > >>> > Is there a unit test covering it? Otherwise can you write one to
> > test
> > > >>>the
> > > >>> > hypothesis.
> > > >>> >
> > > >>> > On Tue, Nov 10, 2015 at 11:00 AM, Timothy Farkas
> > > >>><[email protected]>
> > > >>> > wrote:
> > > >>> >
> > > >>> > > That is what it is looking like to me. The task is submitted
> > > >>> > > GenericNode#checkpoint line 504, then at the end of the
> > > >>> > > GenericNode#checkpoint line 531 checkpointed is called. I am
> > likely
> > > >>> > missing
> > > >>> > > something, just would like to know what :)
> > > >>> > >
> > > >>> > > Tim
> > > >>> > >
> > > >>> > > On Tue, Nov 10, 2015 at 10:51 AM, Pramod Immaneni <
> > > >>> > [email protected]>
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > > > Tim,
> > > >>> > > >
> > > >>> > > > Are you suggesting that checkpointed is called before the
> > > >>>checkpoint
> > > >>> is
> > > >>> > > > completely persisted in the storage.
> > > >>> > > >
> > > >>> > > > Thanks
> > > >>> > > >
> > > >>> > > > On Tue, Nov 10, 2015 at 10:49 AM, Timothy Farkas <
> > > >>> [email protected]>
> > > >>> > > > wrote:
> > > >>> > > >
> > > >>> > > > > Chetan,
> > > >>> > > > >
> > > >>> > > > > I do not see the process of reporting the checkpoint to
> > stram,
> > > >>> > > receiving
> > > >>> > > > > the ack, and then calling checkpointed. The logic I'm
> seeing
> > in
> > > >>> > > > GenericNode
> > > >>> > > > > line 484 is that the checkpoint method is called, it spawns
> > > >>>another
> > > >>> > > > thread
> > > >>> > > > > that writes to hdfs, and then checkpointed is immediately
> > > >>>called
> > > >>> > > > > afterwards. I am missing something, can you give me some
> > > >>>pointers
> > > >>> so
> > > >>> > > > that I
> > > >>> > > > > can better understand the flow?
> > > >>> > > > >
> > > >>> > > > > Tim
> > > >>> > > > >
> > > >>> > > > > On Tue, Nov 10, 2015 at 10:33 AM, Munagala Ramanath <
> > > >>> > > [email protected]
> > > >>> > > > >
> > > >>> > > > > wrote:
> > > >>> > > > >
> > > >>> > > > > > Chetan's answer provides a good explanation as well as
> > > >>>clarifying
> > > >>> > > that
> > > >>> > > > > > the difference can be more than 1.
> > > >>> > > > > >
> > > >>> > > > > > Since checkpointing (i.e. "commit notification" as Thomas
> > > >>>refers
> > > >>> to
> > > >>> > > > > > it) is asynchronous, I'm curious
> > > >>> > > > > > about whether the window ids in the checkpointed call are
> > > >>> > guaranteed
> > > >>> > > > > > to be sequential or if they could
> > > >>> > > > > > be out of order, i.e. can the checkpointed call see
> window
> > id
> > > >>>101
> > > >>> > > > > > before it sees 100 ?
> > > >>> > > > > >
> > > >>> > > > > > Ram
> > > >>> > > > > >
> > > >>> > > > > > On Tue, Nov 10, 2015 at 10:27 AM, Bhupesh Chawda
> > > >>> > > > > > <[email protected]> wrote:
> > > >>> > > > > > > Hi Tim,
> > > >>> > > > > > > Thanks for the detailed explanation.
> > > >>> > > > > > > I understand that the sequence would be
> > > >>> > > > > > > beginWindow  (x) -> endWindow (x) -> checkpointed (x)
> ->
> > > >>> > > beginWindow
> > > >>> > > > > > > (x+1)
> > > >>> > > > > > >
> > > >>> > > > > > > However when I try to print out the window ids in
> > > >>>beginWindow,
> > > >>> > > > > endWindow
> > > >>> > > > > > > and checkpointed calls,  I see x and x-1 respectively.
> > > >>> > > > > > > I.e. If the window just before checkpoint is 100, I see
> > > >>>that
> > > >>> the
> > > >>> > > > > > > checkpointed call had window id 99.
> > > >>> > > > > > >
> > > >>> > > > > > > Note: This is observed in the local mode of Apex.
> > > >>> > > > > > >
> > > >>> > > > > > > Thanks
> > > >>> > > > > > > -Bhupesh
> > > >>> > > > > > > On 10-Nov-2015 11:25 pm, "Timothy Farkas"
> > > >>><[email protected]
> > > >>> >
> > > >>> > > > wrote:
> > > >>> > > > > > >
> > > >>> > > > > > >> Hi Bhupesh,
> > > >>> > > > > > >>
> > > >>> > > > > > >> The sequencing of checkpoint called in relation to
> > > >>>beginWindow
> > > >>> > and
> > > >>> > > > > > >> endWindow depends on how your APPLICATION_WINDOW_COUNT
> > and
> > > >>> > > > > > >> CHECKPOINT_WINDOW_COUNT are configured. If the two
> > > >>> WINDOW_COUNTs
> > > >>> > > are
> > > >>> > > > > not
> > > >>> > > > > > >> configured to be the same then it is possible that
> > > >>> checkpointed
> > > >>> > is
> > > >>> > > > > > called
> > > >>> > > > > > >> within an application window. So the sequence of
> events
> > > >>>would
> > > >>> be
> > > >>> > > > this:
> > > >>> > > > > > >>
> > > >>> > > > > > >> beginWindow -> checkpointed -> endWindow
> > > >>> > > > > > >>
> > > >>> > > > > > >> If the APPLICATION_WINDOW_COUNT and
> > > >>>CHECKPOINT_WINDOW_COUNT
> > > >>> are
> > > >>> > > the
> > > >>> > > > > same
> > > >>> > > > > > >> (default). Then the sequence of calls would be this:
> > > >>> > > > > > >>
> > > >>> > > > > > >> beginWindow  (100) -> endWindow (100) -> checkpointed
> > > >>>(100)
> > > >>> ->
> > > >>> > > > > > beginWindow
> > > >>> > > > > > >> (101)
> > > >>> > > > > > >>
> > > >>> > > > > > >> The numbers in the sequence represent possible
> streaming
> > > >>> window
> > > >>> > > Ids
> > > >>> > > > > that
> > > >>> > > > > > >> each call would be associated with.
> > > >>> > > > > > >>
> > > >>> > > > > > >> The StateMachine which calls these callbacks for
> > non-input
> > > >>> > > operators
> > > >>> > > > > is
> > > >>> > > > > > in
> > > >>> > > > > > >> GenericNode.java.
> > > >>> > > > > > >>
> > > >>> > > > > > >> Tim
> > > >>> > > > > > >>
> > > >>> > > > > > >> On Tue, Nov 10, 2015 at 3:36 AM, Bhupesh Chawda <
> > > >>> > > > > > [email protected]>
> > > >>> > > > > > >> wrote:
> > > >>> > > > > > >>
> > > >>> > > > > > >> > Hi Chetan / Community,
> > > >>> > > > > > >> >
> > > >>> > > > > > >> > Can someone please elaborate on why the window id
> > > >>>supplied
> > > >>> to
> > > >>> > > > > > >> > CheckpointListener and the Operator would differ.
> > > >>> > > > > > >> > I tried looking at the window ids of checkpointed()
> > and
> > > >>>the
> > > >>> > > > > > beginWindow()
> > > >>> > > > > > >> > calls and they differ by 1. Don't know why this
> should
> > > >>>be
> > > >>> the
> > > >>> > > > case.
> > > >>> > > > > > >> >
> > > >>> > > > > > >> > Thanks.
> > > >>> > > > > > >> > -Bhupesh
> > > >>> > > > > > >> >
> > > >>> > > > > > >> > On Thu, Sep 17, 2015 at 5:56 AM, Chetan Narsude <
> > > >>> > > > > > [email protected]>
> > > >>> > > > > > >> > wrote:
> > > >>> > > > > > >> >
> > > >>> > > > > > >> > > Short answer is yes.
> > > >>> > > > > > >> > >
> > > >>> > > > > > >> > > All the control tuples are scheduled to be
> delivered
> > > >>> outside
> > > >>> > > of
> > > >>> > > > > the
> > > >>> > > > > > >> > window.
> > > >>> > > > > > >> > > As checkpointed callback is triggered because of
> > > >>> CHECKPOINT
> > > >>> > > > > control
> > > >>> > > > > > >> > tuple,
> > > >>> > > > > > >> > > it will happen after endWindow and before the next
> > > >>> > > beginWindow.
> > > >>> > > > > > >> > >
> > > >>> > > > > > >> > > The windowId supplied to CheckpointListener and
> the
> > > >>>one
> > > >>> > > provided
> > > >>> > > > > to
> > > >>> > > > > > >> > > Operator need not match even though the sequence
> is
> > > >>> defined.
> > > >>> > > So
> > > >>> > > > I
> > > >>> > > > > am
> > > >>> > > > > > >> > > curious how you intend to use this knowledge.
> > > >>> > > > > > >> > >
> > > >>> > > > > > >> > > --
> > > >>> > > > > > >> > > Chetan
> > > >>> > > > > > >> > >
> > > >>> > > > > > >> > >
> > > >>> > > > > > >> > > On Tue, Sep 15, 2015 at 8:31 AM, Thomas Weise <
> > > >>> > > > > > [email protected]>
> > > >>> > > > > > >> > > wrote:
> > > >>> > > > > > >> > >
> > > >>> > > > > > >> > > > It has not changed the operator execution model.
> > > >>>State
> > > >>> > > > > > serialization
> > > >>> > > > > > >> is
> > > >>> > > > > > >> > > > still synchronous, write to HDFS is taken out of
> > the
> > > >>> > > operator
> > > >>> > > > > > thread.
> > > >>> > > > > > >> > > >
> > > >>> > > > > > >> > > > On Tue, Sep 15, 2015 at 8:18 AM, Amol Kekre <
> > > >>> > > > > [email protected]
> > > >>> > > > > > >
> > > >>> > > > > > >> > > wrote:
> > > >>> > > > > > >> > > >
> > > >>> > > > > > >> > > > >
> > > >>> > > > > > >> > > > > Sent too soon. Has asynchronous checkpointing
> > > >>>changed
> > > >>> > > this?
> > > >>> > > > > > >> > > > >
> > > >>> > > > > > >> > > > > Amol
> > > >>> > > > > > >> > > > >
> > > >>> > > > > > >> > > > > Sent from my iPhone
> > > >>> > > > > > >> > > > >
> > > >>> > > > > > >> > > > > > On Sep 15, 2015, at 12:38 AM, Bhupesh
> Chawda <
> > > >>> > > > > > >> > > [email protected]>
> > > >>> > > > > > >> > > > > wrote:
> > > >>> > > > > > >> > > > > >
> > > >>> > > > > > >> > > > > > Hi All,
> > > >>> > > > > > >> > > > > >
> > > >>> > > > > > >> > > > > > Is it safe to assume that the checkpointed()
> > and
> > > >>>the
> > > >>> > > > > > >> beginWindow()
> > > >>> > > > > > >> > > > calls
> > > >>> > > > > > >> > > > > > are sequenced?
> > > >>> > > > > > >> > > > > > In other words, are these calls part of the
> > same
> > > >>> > thread
> > > >>> > > > and
> > > >>> > > > > > may
> > > >>> > > > > > >> not
> > > >>> > > > > > >> > > run
> > > >>> > > > > > >> > > > > in
> > > >>> > > > > > >> > > > > > parallel?
> > > >>> > > > > > >> > > > > >
> > > >>> > > > > > >> > > > > > Thanks.
> > > >>> > > > > > >> > > > > >
> > > >>> > > > > > >> > > > > > --
> > > >>> > > > > > >> > > > > > -Bhupesh
> > > >>> > > > > > >> > > > >
> > > >>> > > > > > >> > > >
> > > >>> > > > > > >> > >
> > > >>> > > > > > >> >
> > > >>> > > > > > >>
> > > >>> > > > > >
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >
> > >
> > >
> > >
> >
>

Reply via email to