What you are looking for would require the guarantee for checkpointed to be called before next beginWindow.
Why don't you record the list in endWindow? Is it a performance concern? This would then benefit from https://malhar.atlassian.net/browse/APEX-78 On Tue, Nov 10, 2015 at 11:20 PM, Bhupesh Chawda <[email protected]> wrote: > I was trying to do some processing between two checkpoint windows. > > So, for example, an operator deletes some files from hdfs as part of normal > processing within an application window. However, since hdfs deletes cannot > be rolled back by the platform, I am trying NOT to delete the files from > hdfs but just record them in a list. Now when a checkpoint happens, I need > to take a snapshot of the list *as of the checkpoint* and later delete them > asynchronously. This process of taking the snapshot is what I need to do > between two checkpoint windows. This can be done anytime between the > endWindow just before the checkpoint and beginWindow just after the > checkpoint. > > So the question is, can this be done in the checkpointed callback? > > If the callback is asynchronous, i. e. It may be called after the next > window starts, then probably checkpointed is not the right place to do it. > On 11-Nov-2015 11:20 am, "Thomas Weise" <[email protected]> wrote: > > > There does not seem to be a problem with the order of the checkpointed > > callbacks. > > > > I would however question why the callback is delayed until write to > storage > > is complete. From operator perspective, it can be called as soon as > > serialization is complete, which is always synchronous. > > > > The checkpoint cannot be reported to StrAM until the file copy is done, > as > > currently implemented. > > > > Bhupesh, what problem were you looking to solve through the checkpointed > > callback? > > > > > > > > > > On Tue, Nov 10, 2015 at 3:57 PM, Chetan Narsude <[email protected]> > wrote: > > > > > With async checkpointing the Node.reportStats is reporting it back and > it > > > looks like the code is checking for the top of the queue to see if it’s > > > done (or not reporting at all). So I do not see a reason why they will > be > > > reported out of order. > > > > > > Chandni, I read your subsequent responses. Valid point in the last > email > > > about documentation. It amazes me how much the documentation of our API > > > can be improved (self guilty probably the most). One comment that I > > wanted > > > to make even before that is that the semantics of copyToHdfs is that it > > > either succeeds or throws exception in which case the recovery kicks in > > > and nothing is reported as checkpointed. > > > > > > Tim, we need the unit test, man! > > > > > > — > > > Chetan > > > > > > > > > On 11/10/15, 3:05 PM, "Chetan Narsude (cnarsude)" <[email protected]> > > > wrote: > > > > > > >There are a lot of things which are different when it comes to async > > > >checkpointing. I was evaluating it in the morning and expect that > > either I > > > >am able to explain or open jira issues. With my partial observation is > > > >that with Async checkpointing, checkpointed is not issued (chandni, > the > > > >last statement in the if block is ³return²). I am digging into it but > > feel > > > >free to chime in if someone else is able to find that. > > > > > > > >Also I realized that my morning email applies as it is to committed > but > > > >checkpointed has deviated a little bit from that. Will post the > revised > > > >response soon. > > > > > > > >‹ > > > >Chetan > > > > > > > > > > > > > > > > > > > >On 11/10/15, 2:04 PM, "Chandni Singh" <[email protected]> > wrote: > > > > > > > >>Chetan, > > > >> > > > >>Looking at the checkpoint(windowId) in Node.java, I don't think the > > steps > > > >>you mentioned are followed. > > > >> > > > >>*if (using AsyncFSStorageAgent) {* > > > >>* asyncFSStorageAgent.copyToHdfs(...)* > > > >>*}* > > > >>*operator.checkpointed(windowId);* > > > >> > > > >>This means even copyToHdfs fails the operator is notified that the > > window > > > >>is check-pointed. > > > >> > > > >>Are we saying that copyToHdfs will never fail with > AsyncFSStorageAgent > > > >>for > > > >>a window since the operator is notified that the window is > > checkpointed? > > > >> > > > >>Chandni > > > >> > > > >>On Tue, Nov 10, 2015 at 11:33 AM, Timothy Farkas < > [email protected]> > > > >>wrote: > > > >> > > > >>> Will do > > > >>> > > > >>> On Tue, Nov 10, 2015 at 11:01 AM, Pramod Immaneni > > > >>><[email protected]> > > > >>> wrote: > > > >>> > > > >>> > Is there a unit test covering it? Otherwise can you write one to > > test > > > >>>the > > > >>> > hypothesis. > > > >>> > > > > >>> > On Tue, Nov 10, 2015 at 11:00 AM, Timothy Farkas > > > >>><[email protected]> > > > >>> > wrote: > > > >>> > > > > >>> > > That is what it is looking like to me. The task is submitted > > > >>> > > GenericNode#checkpoint line 504, then at the end of the > > > >>> > > GenericNode#checkpoint line 531 checkpointed is called. I am > > likely > > > >>> > missing > > > >>> > > something, just would like to know what :) > > > >>> > > > > > >>> > > Tim > > > >>> > > > > > >>> > > On Tue, Nov 10, 2015 at 10:51 AM, Pramod Immaneni < > > > >>> > [email protected]> > > > >>> > > wrote: > > > >>> > > > > > >>> > > > Tim, > > > >>> > > > > > > >>> > > > Are you suggesting that checkpointed is called before the > > > >>>checkpoint > > > >>> is > > > >>> > > > completely persisted in the storage. > > > >>> > > > > > > >>> > > > Thanks > > > >>> > > > > > > >>> > > > On Tue, Nov 10, 2015 at 10:49 AM, Timothy Farkas < > > > >>> [email protected]> > > > >>> > > > wrote: > > > >>> > > > > > > >>> > > > > Chetan, > > > >>> > > > > > > > >>> > > > > I do not see the process of reporting the checkpoint to > > stram, > > > >>> > > receiving > > > >>> > > > > the ack, and then calling checkpointed. The logic I'm > seeing > > in > > > >>> > > > GenericNode > > > >>> > > > > line 484 is that the checkpoint method is called, it spawns > > > >>>another > > > >>> > > > thread > > > >>> > > > > that writes to hdfs, and then checkpointed is immediately > > > >>>called > > > >>> > > > > afterwards. I am missing something, can you give me some > > > >>>pointers > > > >>> so > > > >>> > > > that I > > > >>> > > > > can better understand the flow? > > > >>> > > > > > > > >>> > > > > Tim > > > >>> > > > > > > > >>> > > > > On Tue, Nov 10, 2015 at 10:33 AM, Munagala Ramanath < > > > >>> > > [email protected] > > > >>> > > > > > > > >>> > > > > wrote: > > > >>> > > > > > > > >>> > > > > > Chetan's answer provides a good explanation as well as > > > >>>clarifying > > > >>> > > that > > > >>> > > > > > the difference can be more than 1. > > > >>> > > > > > > > > >>> > > > > > Since checkpointing (i.e. "commit notification" as Thomas > > > >>>refers > > > >>> to > > > >>> > > > > > it) is asynchronous, I'm curious > > > >>> > > > > > about whether the window ids in the checkpointed call are > > > >>> > guaranteed > > > >>> > > > > > to be sequential or if they could > > > >>> > > > > > be out of order, i.e. can the checkpointed call see > window > > id > > > >>>101 > > > >>> > > > > > before it sees 100 ? > > > >>> > > > > > > > > >>> > > > > > Ram > > > >>> > > > > > > > > >>> > > > > > On Tue, Nov 10, 2015 at 10:27 AM, Bhupesh Chawda > > > >>> > > > > > <[email protected]> wrote: > > > >>> > > > > > > Hi Tim, > > > >>> > > > > > > Thanks for the detailed explanation. > > > >>> > > > > > > I understand that the sequence would be > > > >>> > > > > > > beginWindow (x) -> endWindow (x) -> checkpointed (x) > -> > > > >>> > > beginWindow > > > >>> > > > > > > (x+1) > > > >>> > > > > > > > > > >>> > > > > > > However when I try to print out the window ids in > > > >>>beginWindow, > > > >>> > > > > endWindow > > > >>> > > > > > > and checkpointed calls, I see x and x-1 respectively. > > > >>> > > > > > > I.e. If the window just before checkpoint is 100, I see > > > >>>that > > > >>> the > > > >>> > > > > > > checkpointed call had window id 99. > > > >>> > > > > > > > > > >>> > > > > > > Note: This is observed in the local mode of Apex. > > > >>> > > > > > > > > > >>> > > > > > > Thanks > > > >>> > > > > > > -Bhupesh > > > >>> > > > > > > On 10-Nov-2015 11:25 pm, "Timothy Farkas" > > > >>><[email protected] > > > >>> > > > > >>> > > > wrote: > > > >>> > > > > > > > > > >>> > > > > > >> Hi Bhupesh, > > > >>> > > > > > >> > > > >>> > > > > > >> The sequencing of checkpoint called in relation to > > > >>>beginWindow > > > >>> > and > > > >>> > > > > > >> endWindow depends on how your APPLICATION_WINDOW_COUNT > > and > > > >>> > > > > > >> CHECKPOINT_WINDOW_COUNT are configured. If the two > > > >>> WINDOW_COUNTs > > > >>> > > are > > > >>> > > > > not > > > >>> > > > > > >> configured to be the same then it is possible that > > > >>> checkpointed > > > >>> > is > > > >>> > > > > > called > > > >>> > > > > > >> within an application window. So the sequence of > events > > > >>>would > > > >>> be > > > >>> > > > this: > > > >>> > > > > > >> > > > >>> > > > > > >> beginWindow -> checkpointed -> endWindow > > > >>> > > > > > >> > > > >>> > > > > > >> If the APPLICATION_WINDOW_COUNT and > > > >>>CHECKPOINT_WINDOW_COUNT > > > >>> are > > > >>> > > the > > > >>> > > > > same > > > >>> > > > > > >> (default). Then the sequence of calls would be this: > > > >>> > > > > > >> > > > >>> > > > > > >> beginWindow (100) -> endWindow (100) -> checkpointed > > > >>>(100) > > > >>> -> > > > >>> > > > > > beginWindow > > > >>> > > > > > >> (101) > > > >>> > > > > > >> > > > >>> > > > > > >> The numbers in the sequence represent possible > streaming > > > >>> window > > > >>> > > Ids > > > >>> > > > > that > > > >>> > > > > > >> each call would be associated with. > > > >>> > > > > > >> > > > >>> > > > > > >> The StateMachine which calls these callbacks for > > non-input > > > >>> > > operators > > > >>> > > > > is > > > >>> > > > > > in > > > >>> > > > > > >> GenericNode.java. > > > >>> > > > > > >> > > > >>> > > > > > >> Tim > > > >>> > > > > > >> > > > >>> > > > > > >> On Tue, Nov 10, 2015 at 3:36 AM, Bhupesh Chawda < > > > >>> > > > > > [email protected]> > > > >>> > > > > > >> wrote: > > > >>> > > > > > >> > > > >>> > > > > > >> > Hi Chetan / Community, > > > >>> > > > > > >> > > > > >>> > > > > > >> > Can someone please elaborate on why the window id > > > >>>supplied > > > >>> to > > > >>> > > > > > >> > CheckpointListener and the Operator would differ. > > > >>> > > > > > >> > I tried looking at the window ids of checkpointed() > > and > > > >>>the > > > >>> > > > > > beginWindow() > > > >>> > > > > > >> > calls and they differ by 1. Don't know why this > should > > > >>>be > > > >>> the > > > >>> > > > case. > > > >>> > > > > > >> > > > > >>> > > > > > >> > Thanks. > > > >>> > > > > > >> > -Bhupesh > > > >>> > > > > > >> > > > > >>> > > > > > >> > On Thu, Sep 17, 2015 at 5:56 AM, Chetan Narsude < > > > >>> > > > > > [email protected]> > > > >>> > > > > > >> > wrote: > > > >>> > > > > > >> > > > > >>> > > > > > >> > > Short answer is yes. > > > >>> > > > > > >> > > > > > >>> > > > > > >> > > All the control tuples are scheduled to be > delivered > > > >>> outside > > > >>> > > of > > > >>> > > > > the > > > >>> > > > > > >> > window. > > > >>> > > > > > >> > > As checkpointed callback is triggered because of > > > >>> CHECKPOINT > > > >>> > > > > control > > > >>> > > > > > >> > tuple, > > > >>> > > > > > >> > > it will happen after endWindow and before the next > > > >>> > > beginWindow. > > > >>> > > > > > >> > > > > > >>> > > > > > >> > > The windowId supplied to CheckpointListener and > the > > > >>>one > > > >>> > > provided > > > >>> > > > > to > > > >>> > > > > > >> > > Operator need not match even though the sequence > is > > > >>> defined. > > > >>> > > So > > > >>> > > > I > > > >>> > > > > am > > > >>> > > > > > >> > > curious how you intend to use this knowledge. > > > >>> > > > > > >> > > > > > >>> > > > > > >> > > -- > > > >>> > > > > > >> > > Chetan > > > >>> > > > > > >> > > > > > >>> > > > > > >> > > > > > >>> > > > > > >> > > On Tue, Sep 15, 2015 at 8:31 AM, Thomas Weise < > > > >>> > > > > > [email protected]> > > > >>> > > > > > >> > > wrote: > > > >>> > > > > > >> > > > > > >>> > > > > > >> > > > It has not changed the operator execution model. > > > >>>State > > > >>> > > > > > serialization > > > >>> > > > > > >> is > > > >>> > > > > > >> > > > still synchronous, write to HDFS is taken out of > > the > > > >>> > > operator > > > >>> > > > > > thread. > > > >>> > > > > > >> > > > > > > >>> > > > > > >> > > > On Tue, Sep 15, 2015 at 8:18 AM, Amol Kekre < > > > >>> > > > > [email protected] > > > >>> > > > > > > > > > >>> > > > > > >> > > wrote: > > > >>> > > > > > >> > > > > > > >>> > > > > > >> > > > > > > > >>> > > > > > >> > > > > Sent too soon. Has asynchronous checkpointing > > > >>>changed > > > >>> > > this? > > > >>> > > > > > >> > > > > > > > >>> > > > > > >> > > > > Amol > > > >>> > > > > > >> > > > > > > > >>> > > > > > >> > > > > Sent from my iPhone > > > >>> > > > > > >> > > > > > > > >>> > > > > > >> > > > > > On Sep 15, 2015, at 12:38 AM, Bhupesh > Chawda < > > > >>> > > > > > >> > > [email protected]> > > > >>> > > > > > >> > > > > wrote: > > > >>> > > > > > >> > > > > > > > > >>> > > > > > >> > > > > > Hi All, > > > >>> > > > > > >> > > > > > > > > >>> > > > > > >> > > > > > Is it safe to assume that the checkpointed() > > and > > > >>>the > > > >>> > > > > > >> beginWindow() > > > >>> > > > > > >> > > > calls > > > >>> > > > > > >> > > > > > are sequenced? > > > >>> > > > > > >> > > > > > In other words, are these calls part of the > > same > > > >>> > thread > > > >>> > > > and > > > >>> > > > > > may > > > >>> > > > > > >> not > > > >>> > > > > > >> > > run > > > >>> > > > > > >> > > > > in > > > >>> > > > > > >> > > > > > parallel? > > > >>> > > > > > >> > > > > > > > > >>> > > > > > >> > > > > > Thanks. > > > >>> > > > > > >> > > > > > > > > >>> > > > > > >> > > > > > -- > > > >>> > > > > > >> > > > > > -Bhupesh > > > >>> > > > > > >> > > > > > > > >>> > > > > > >> > > > > > > >>> > > > > > >> > > > > > >>> > > > > > >> > > > > >>> > > > > > >> > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > > > > > > > > > > > > >
