Instead of treating the stateless operator in a special way and missing corner cases, just have a dummy checkpoint, then there is no need to handle corner cases.
There is a name for this solution, https://en.wikipedia.org/wiki/Null_Object_pattern On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <[email protected]> wrote: > There is code in various places that deals with stateless operators in a > special way even though a physical checkpoint does not exist on the disk. > It is probably a matter of applying similar thought process/logic correctly > here. > > On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <[email protected]> wrote: > > > hmm! the fact that commitWindowId has moved up (right now in memory of > > Stram) should mean that a complete set of checkpoints are available, i.e > > commitWindowId can be derived. Lets say that next checkpoint window also > > gets checkpointed across the app, commitwindowID is in memory but not > > written to stram-state yet, then upon relaunch the latest commitwindowID > > should get computed correctly. > > > > This may be just about setting stateless operators to commitWindowid on > > re-launch? aka bug/feature? > > > > Thks > > Amol > > > > > > > > E:[email protected] | M: 510-449-2606 <(510)%20449-2606> | Twitter: > @*amolhkekre* > > > > www.datatorrent.com | apex.apache.org > > > > *Join us at Apex Big Data World-San Jose > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > > [image: http://www.apexbigdata.com/san-jose-register.html] > > <http://www.apexbigdata.com/san-jose-register.html> > > > > On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <[email protected]> > > wrote: > > > > > Do we need to save committedWindowId? Can't it be computed from > existing > > > checkpoints by walking through the DAG. We probably do this anyway and > I > > > suspect there is a minor bug somewhere in there. If an operator is > > > stateless you could assume checkpoint as long max for sake of > computation > > > and compute the committed window to be the lowest common checkpoint. If > > > they are all stateless and you end up with long max you can start with > > > window id that reflects the current timestamp. > > > > > > Thanks > > > > > > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <[email protected]> > wrote: > > > > > > > CommitWindowId could be computed from the existing checkpoints. That > > > > solution still needs purge to be done after commitWindowId is > confirmed > > > to > > > > be saved in Stram state. Without ths the commitWindowId computed from > > the > > > > checkpoints may have some checkpoints missing. > > > > > > > > Thks > > > > Amol > > > > > > > > > > > > > > > > E:[email protected] | M: 510-449-2606 <(510)%20449-2606> | > Twitter: @*amolhkekre* > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > *Join us at Apex Big Data World-San Jose > > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* > > > > [image: http://www.apexbigdata.com/san-jose-register.html] > > > > <http://www.apexbigdata.com/san-jose-register.html> > > > > > > > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni < > > [email protected] > > > > > > > > wrote: > > > > > > > > > Can't the commitedWindowId be calculated by looking at the physical > > > plan > > > > > and the existing checkpoints? > > > > > > > > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <[email protected]> > > > wrote: > > > > > > > > > > > Help Needed for APEXCORE-619 > > > > > > > > > > > > Issue : When application is relaunched after long time with > > stateless > > > > > > opeartors at the end of the DAG, the stateless operators starts > > with > > > a > > > > > very > > > > > > high windowId. In this case the stateless operator ignors all the > > > data > > > > > > received till upstream operator catches up with it. This breaks > the > > > > > > *at-least-once* gaurantee while relaunch of the opeartor or when > > > master > > > > > is > > > > > > killed and application is restarted. > > > > > > > > > > > > Solutions: > > > > > > - Fix windowId for stateless leaf operators from upstream > opeartor. > > > But > > > > > it > > > > > > has some issues when we have a join with two upstrams operators > at > > > > > > different windowId. If we set the windowID to min(upstream > > windowId), > > > > > then > > > > > > we need to again recalulate the new recovery window ids for > > upstream > > > > > paths > > > > > > from this operators. > > > > > > > > > > > > - Other solution is to create a empty file in checkpoint > directory > > > for > > > > > > stateless operators. This will help us to identify the > checkpoints > > of > > > > > > stateless operators during relaunch instead of computing from > > latest > > > > > > timestamp. > > > > > > > > > > > > - Bring the entire DAG to committedWindowId. This could be > achived > > > > using > > > > > > writing committedWindowId in a journal. we need to make sure that > > we > > > > are > > > > > > not puring the checkpointed state until the committedWundowId is > > > saved > > > > in > > > > > > journal. > > > > > > > > > > > > Let me know your thoughs on this and preferred solution. > > > > > > > > > > > > Regards, > > > > > > -Tushar. > > > > > > > > > > > > > > > > > > > > > -- *Join us at Apex Big Data World-San Jose <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* [image: http://www.apexbigdata.com/san-jose-register.html]
