Instead of treating the stateless operator in a special way and missing
corner cases, just have a dummy checkpoint, then there is no need to handle
corner cases.

There is a name for this solution,
https://en.wikipedia.org/wiki/Null_Object_pattern



On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <[email protected]>
wrote:

> There is code in various places that deals with stateless operators in a
> special way even though a physical checkpoint does not exist on the disk.
> It is probably a matter of applying similar thought process/logic correctly
> here.
>
> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <[email protected]> wrote:
>
> > hmm! the fact that commitWindowId has moved up (right now in memory of
> > Stram) should mean that a complete set of checkpoints are available, i.e
> > commitWindowId can be derived. Lets say that next checkpoint window also
> > gets checkpointed across the app, commitwindowID is in memory but not
> > written to stram-state yet, then upon relaunch the latest commitwindowID
> > should get computed correctly.
> >
> > This may be just about setting stateless operators to commitWindowid on
> > re-launch? aka bug/feature?
> >
> > Thks
> > Amol
> >
> >
> >
> > E:[email protected] | M: 510-449-2606 <(510)%20449-2606> | Twitter:
> @*amolhkekre*
> >
> > www.datatorrent.com  |  apex.apache.org
> >
> > *Join us at Apex Big Data World-San Jose
> > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > [image: http://www.apexbigdata.com/san-jose-register.html]
> > <http://www.apexbigdata.com/san-jose-register.html>
> >
> > On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <[email protected]>
> > wrote:
> >
> > > Do we need to save committedWindowId? Can't it be computed from
> existing
> > > checkpoints by walking through the DAG. We probably do this anyway and
> I
> > > suspect there is a minor bug somewhere in there. If an operator is
> > > stateless you could assume checkpoint as long max for sake of
> computation
> > > and compute the committed window to be the lowest common checkpoint. If
> > > they are all stateless and you end up with long max you can start with
> > > window id that reflects the current timestamp.
> > >
> > > Thanks
> > >
> > > On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <[email protected]>
> wrote:
> > >
> > > > CommitWindowId could be computed from the existing checkpoints. That
> > > > solution still needs purge to be done after commitWindowId is
> confirmed
> > > to
> > > > be saved in Stram state. Without ths the commitWindowId computed from
> > the
> > > > checkpoints may have some checkpoints missing.
> > > >
> > > > Thks
> > > > Amol
> > > >
> > > >
> > > >
> > > > E:[email protected] | M: 510-449-2606 <(510)%20449-2606> |
> Twitter: @*amolhkekre*
> > > >
> > > > www.datatorrent.com  |  apex.apache.org
> > > >
> > > > *Join us at Apex Big Data World-San Jose
> > > > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > > > [image: http://www.apexbigdata.com/san-jose-register.html]
> > > > <http://www.apexbigdata.com/san-jose-register.html>
> > > >
> > > > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> > [email protected]
> > > >
> > > > wrote:
> > > >
> > > > > Can't the commitedWindowId be calculated by looking at the physical
> > > plan
> > > > > and the existing checkpoints?
> > > > >
> > > > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <[email protected]>
> > > wrote:
> > > > >
> > > > > > Help Needed for APEXCORE-619
> > > > > >
> > > > > > Issue : When application is relaunched after long time with
> > stateless
> > > > > > opeartors at the end of the DAG, the stateless operators starts
> > with
> > > a
> > > > > very
> > > > > > high windowId. In this case the stateless operator ignors all the
> > > data
> > > > > > received till upstream operator catches up with it. This breaks
> the
> > > > > > *at-least-once* gaurantee while relaunch of the opeartor or when
> > > master
> > > > > is
> > > > > > killed and application is restarted.
> > > > > >
> > > > > > Solutions:
> > > > > > - Fix windowId for stateless leaf operators from upstream
> opeartor.
> > > But
> > > > > it
> > > > > > has some issues when we have a join with two upstrams operators
> at
> > > > > > different windowId. If we set the windowID to min(upstream
> > windowId),
> > > > > then
> > > > > > we need to again recalulate the new recovery window ids for
> > upstream
> > > > > paths
> > > > > > from this operators.
> > > > > >
> > > > > > - Other solution is to create a empty file in checkpoint
> directory
> > > for
> > > > > > stateless operators. This will help us to identify the
> checkpoints
> > of
> > > > > > stateless operators during relaunch instead of computing from
> > latest
> > > > > > timestamp.
> > > > > >
> > > > > > - Bring the entire DAG to committedWindowId. This could be
> achived
> > > > using
> > > > > > writing committedWindowId in a journal. we need to make sure that
> > we
> > > > are
> > > > > > not puring the checkpointed state until the committedWundowId is
> > > saved
> > > > in
> > > > > > journal.
> > > > > >
> > > > > > Let me know your thoughs on this and preferred solution.
> > > > > >
> > > > > > Regards,
> > > > > > -Tushar.
> > > > > >
> > > > >
> > > >
> > >
> >
>
-- 
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]

Reply via email to