Just to give some more light for everyone’s benefit: Checkpointed semantics is to tell the operator that its state is preserved. The operator does not need to be told that it’s state is collected (serialized) as it does know when that operation starts, when it’s in progress and when it ends. There are good constructs to intercept serialization which are not apex specific.
Bhuopesh, for your use case you would need to keep mapping between the window id and the files that need to be processed during the window id. Also you probably need to do this operation in committed callback and not checkpointed (of course I am guessing due to incomplete knowledge of your use case). dtIngest has a similar use case that we worked on more than a year ago and probably someone from that team can give you more insight into the problem we are trying to avoid. — Chetan On 11/11/15, 11:38 AM, "Timothy Farkas" <[email protected]> wrote: >Hi Gaurav, > >I've added some tests to check some of the conditions when checkpointed is >called https://github.com/apache/incubator-apex-core/pull/163 > >If there are more ideas of things to test I can add more unit tests as >well. > >Also I was wondering how a failure to copy data from local disk to hdfs is >handled? If an exception is thrown the thread doing the task will die and >then what? Is the task retried? > >Thanks, >Tim > >On Wed, Nov 11, 2015 at 1:29 AM, Gaurav Gupta <[email protected]> >wrote: > >> Chetan, >> >> Your analysis is correct. >> >> I will again summarize how AsyncFSStorageAgent works. It involves two >>steps >> >> 1. Serializing the Operator state to local disk. >> 2. Moving this serialized state to HDFS. >> >> These are how the above two steps work in different scenario >> >> 1. If PROCESSING is EXACTLY ONCE for an operator, then both the above >> steps happen in the Operator thread. Once the above two steps are >> completed, checkpointed() call back to Operator is made. >> 2. For other scenarios, the 1st step happens in the Operator thread and >> 2nd step is triggered in different thread. During the reportStats call, >>if >> the 2nd step is complete checkpointed() call back to Operator is made. >> >> Thanks >> - Gaurav >> >> > On Nov 10, 2015, at 3:57 PM, Chetan Narsude <[email protected]> wrote: >> > >> > With async checkpointing the Node.reportStats is reporting it back >>and it >> > looks like the code is checking for the top of the queue to see if >>it’s >> > done (or not reporting at all). So I do not see a reason why they >>will be >> > reported out of order. >> > >> > Chandni, I read your subsequent responses. Valid point in the last >>email >> > about documentation. It amazes me how much the documentation of our >>API >> > can be improved (self guilty probably the most). One comment that I >> wanted >> > to make even before that is that the semantics of copyToHdfs is that >>it >> > either succeeds or throws exception in which case the recovery kicks >>in >> > and nothing is reported as checkpointed. >> > >> > Tim, we need the unit test, man! >> > >> > — >> > Chetan >> > >> > >> > On 11/10/15, 3:05 PM, "Chetan Narsude (cnarsude)" <[email protected]> >> > wrote: >> > >> >> There are a lot of things which are different when it comes to async >> >> checkpointing. I was evaluating it in the morning and expect that >> either I >> >> am able to explain or open jira issues. With my partial observation >>is >> >> that with Async checkpointing, checkpointed is not issued (chandni, >>the >> >> last statement in the if block is ³return²). I am digging into it but >> feel >> >> free to chime in if someone else is able to find that. >> >> >> >> Also I realized that my morning email applies as it is to committed >>but >> >> checkpointed has deviated a little bit from that. Will post the >>revised >> >> response soon. >> >> >> >> ‹ >> >> Chetan >> >> >> >> >> >> >> >> >> >> On 11/10/15, 2:04 PM, "Chandni Singh" <[email protected]> >>wrote: >> >> >> >>> Chetan, >> >>> >> >>> Looking at the checkpoint(windowId) in Node.java, I don't think the >> steps >> >>> you mentioned are followed. >> >>> >> >>> *if (using AsyncFSStorageAgent) {* >> >>> * asyncFSStorageAgent.copyToHdfs(...)* >> >>> *}* >> >>> *operator.checkpointed(windowId);* >> >>> >> >>> This means even copyToHdfs fails the operator is notified that the >> window >> >>> is check-pointed. >> >>> >> >>> Are we saying that copyToHdfs will never fail with >>AsyncFSStorageAgent >> >>> for >> >>> a window since the operator is notified that the window is >> checkpointed? >> >>> >> >>> Chandni >> >>> >> >>> On Tue, Nov 10, 2015 at 11:33 AM, Timothy Farkas >><[email protected]> >> >>> wrote: >> >>> >> >>>> Will do >> >>>> >> >>>> On Tue, Nov 10, 2015 at 11:01 AM, Pramod Immaneni >> >>>> <[email protected]> >> >>>> wrote: >> >>>> >> >>>>> Is there a unit test covering it? Otherwise can you write one to >>test >> >>>> the >> >>>>> hypothesis. >> >>>>> >> >>>>> On Tue, Nov 10, 2015 at 11:00 AM, Timothy Farkas >> >>>> <[email protected]> >> >>>>> wrote: >> >>>>> >> >>>>>> That is what it is looking like to me. The task is submitted >> >>>>>> GenericNode#checkpoint line 504, then at the end of the >> >>>>>> GenericNode#checkpoint line 531 checkpointed is called. I am >>likely >> >>>>> missing >> >>>>>> something, just would like to know what :) >> >>>>>> >> >>>>>> Tim >> >>>>>> >> >>>>>> On Tue, Nov 10, 2015 at 10:51 AM, Pramod Immaneni < >> >>>>> [email protected]> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> Tim, >> >>>>>>> >> >>>>>>> Are you suggesting that checkpointed is called before the >> >>>> checkpoint >> >>>> is >> >>>>>>> completely persisted in the storage. >> >>>>>>> >> >>>>>>> Thanks >> >>>>>>> >> >>>>>>> On Tue, Nov 10, 2015 at 10:49 AM, Timothy Farkas < >> >>>> [email protected]> >> >>>>>>> wrote: >> >>>>>>> >> >>>>>>>> Chetan, >> >>>>>>>> >> >>>>>>>> I do not see the process of reporting the checkpoint to stram, >> >>>>>> receiving >> >>>>>>>> the ack, and then calling checkpointed. The logic I'm seeing in >> >>>>>>> GenericNode >> >>>>>>>> line 484 is that the checkpoint method is called, it spawns >> >>>> another >> >>>>>>> thread >> >>>>>>>> that writes to hdfs, and then checkpointed is immediately >> >>>> called >> >>>>>>>> afterwards. I am missing something, can you give me some >> >>>> pointers >> >>>> so >> >>>>>>> that I >> >>>>>>>> can better understand the flow? >> >>>>>>>> >> >>>>>>>> Tim >> >>>>>>>> >> >>>>>>>> On Tue, Nov 10, 2015 at 10:33 AM, Munagala Ramanath < >> >>>>>> [email protected] >> >>>>>>>> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Chetan's answer provides a good explanation as well as >> >>>> clarifying >> >>>>>> that >> >>>>>>>>> the difference can be more than 1. >> >>>>>>>>> >> >>>>>>>>> Since checkpointing (i.e. "commit notification" as Thomas >> >>>> refers >> >>>> to >> >>>>>>>>> it) is asynchronous, I'm curious >> >>>>>>>>> about whether the window ids in the checkpointed call are >> >>>>> guaranteed >> >>>>>>>>> to be sequential or if they could >> >>>>>>>>> be out of order, i.e. can the checkpointed call see window id >> >>>> 101 >> >>>>>>>>> before it sees 100 ? >> >>>>>>>>> >> >>>>>>>>> Ram >> >>>>>>>>> >> >>>>>>>>> On Tue, Nov 10, 2015 at 10:27 AM, Bhupesh Chawda >> >>>>>>>>> <[email protected]> wrote: >> >>>>>>>>>> Hi Tim, >> >>>>>>>>>> Thanks for the detailed explanation. >> >>>>>>>>>> I understand that the sequence would be >> >>>>>>>>>> beginWindow (x) -> endWindow (x) -> checkpointed (x) -> >> >>>>>> beginWindow >> >>>>>>>>>> (x+1) >> >>>>>>>>>> >> >>>>>>>>>> However when I try to print out the window ids in >> >>>> beginWindow, >> >>>>>>>> endWindow >> >>>>>>>>>> and checkpointed calls, I see x and x-1 respectively. >> >>>>>>>>>> I.e. If the window just before checkpoint is 100, I see >> >>>> that >> >>>> the >> >>>>>>>>>> checkpointed call had window id 99. >> >>>>>>>>>> >> >>>>>>>>>> Note: This is observed in the local mode of Apex. >> >>>>>>>>>> >> >>>>>>>>>> Thanks >> >>>>>>>>>> -Bhupesh >> >>>>>>>>>> On 10-Nov-2015 11:25 pm, "Timothy Farkas" >> >>>> <[email protected] >> >>>>> >> >>>>>>> wrote: >> >>>>>>>>>> >> >>>>>>>>>>> Hi Bhupesh, >> >>>>>>>>>>> >> >>>>>>>>>>> The sequencing of checkpoint called in relation to >> >>>> beginWindow >> >>>>> and >> >>>>>>>>>>> endWindow depends on how your APPLICATION_WINDOW_COUNT and >> >>>>>>>>>>> CHECKPOINT_WINDOW_COUNT are configured. If the two >> >>>> WINDOW_COUNTs >> >>>>>> are >> >>>>>>>> not >> >>>>>>>>>>> configured to be the same then it is possible that >> >>>> checkpointed >> >>>>> is >> >>>>>>>>> called >> >>>>>>>>>>> within an application window. So the sequence of events >> >>>> would >> >>>> be >> >>>>>>> this: >> >>>>>>>>>>> >> >>>>>>>>>>> beginWindow -> checkpointed -> endWindow >> >>>>>>>>>>> >> >>>>>>>>>>> If the APPLICATION_WINDOW_COUNT and >> >>>> CHECKPOINT_WINDOW_COUNT >> >>>> are >> >>>>>> the >> >>>>>>>> same >> >>>>>>>>>>> (default). Then the sequence of calls would be this: >> >>>>>>>>>>> >> >>>>>>>>>>> beginWindow (100) -> endWindow (100) -> checkpointed >> >>>> (100) >> >>>> -> >> >>>>>>>>> beginWindow >> >>>>>>>>>>> (101) >> >>>>>>>>>>> >> >>>>>>>>>>> The numbers in the sequence represent possible streaming >> >>>> window >> >>>>>> Ids >> >>>>>>>> that >> >>>>>>>>>>> each call would be associated with. >> >>>>>>>>>>> >> >>>>>>>>>>> The StateMachine which calls these callbacks for non-input >> >>>>>> operators >> >>>>>>>> is >> >>>>>>>>> in >> >>>>>>>>>>> GenericNode.java. >> >>>>>>>>>>> >> >>>>>>>>>>> Tim >> >>>>>>>>>>> >> >>>>>>>>>>> On Tue, Nov 10, 2015 at 3:36 AM, Bhupesh Chawda < >> >>>>>>>>> [email protected]> >> >>>>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hi Chetan / Community, >> >>>>>>>>>>>> >> >>>>>>>>>>>> Can someone please elaborate on why the window id >> >>>> supplied >> >>>> to >> >>>>>>>>>>>> CheckpointListener and the Operator would differ. >> >>>>>>>>>>>> I tried looking at the window ids of checkpointed() and >> >>>> the >> >>>>>>>>> beginWindow() >> >>>>>>>>>>>> calls and they differ by 1. Don't know why this should >> >>>> be >> >>>> the >> >>>>>>> case. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks. >> >>>>>>>>>>>> -Bhupesh >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Thu, Sep 17, 2015 at 5:56 AM, Chetan Narsude < >> >>>>>>>>> [email protected]> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> Short answer is yes. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> All the control tuples are scheduled to be delivered >> >>>> outside >> >>>>>> of >> >>>>>>>> the >> >>>>>>>>>>>> window. >> >>>>>>>>>>>>> As checkpointed callback is triggered because of >> >>>> CHECKPOINT >> >>>>>>>> control >> >>>>>>>>>>>> tuple, >> >>>>>>>>>>>>> it will happen after endWindow and before the next >> >>>>>> beginWindow. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> The windowId supplied to CheckpointListener and the >> >>>> one >> >>>>>> provided >> >>>>>>>> to >> >>>>>>>>>>>>> Operator need not match even though the sequence is >> >>>> defined. >> >>>>>> So >> >>>>>>> I >> >>>>>>>> am >> >>>>>>>>>>>>> curious how you intend to use this knowledge. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> -- >> >>>>>>>>>>>>> Chetan >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Tue, Sep 15, 2015 at 8:31 AM, Thomas Weise < >> >>>>>>>>> [email protected]> >> >>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> It has not changed the operator execution model. >> >>>> State >> >>>>>>>>> serialization >> >>>>>>>>>>> is >> >>>>>>>>>>>>>> still synchronous, write to HDFS is taken out of the >> >>>>>> operator >> >>>>>>>>> thread. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> On Tue, Sep 15, 2015 at 8:18 AM, Amol Kekre < >> >>>>>>>> [email protected] >> >>>>>>>>>> >> >>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Sent too soon. Has asynchronous checkpointing >> >>>> changed >> >>>>>> this? >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Amol >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Sent from my iPhone >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> On Sep 15, 2015, at 12:38 AM, Bhupesh Chawda < >> >>>>>>>>>>>>> [email protected]> >> >>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Hi All, >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Is it safe to assume that the checkpointed() and >> >>>> the >> >>>>>>>>>>> beginWindow() >> >>>>>>>>>>>>>> calls >> >>>>>>>>>>>>>>>> are sequenced? >> >>>>>>>>>>>>>>>> In other words, are these calls part of the same >> >>>>> thread >> >>>>>>> and >> >>>>>>>>> may >> >>>>>>>>>>> not >> >>>>>>>>>>>>> run >> >>>>>>>>>>>>>>> in >> >>>>>>>>>>>>>>>> parallel? >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Thanks. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> -- >> >>>>>>>>>>>>>>>> -Bhupesh >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>>> >> >>>>> >> >>>> >> >> >> > >> > >> >>
