hmm! the fact that commitWindowId has moved up (right now in memory of
Stram) should mean that a complete set of checkpoints are available, i.e
commitWindowId can be derived. Lets say that next checkpoint window also
gets checkpointed across the app, commitwindowID is in memory but not
written to stram-state yet, then upon relaunch the latest commitwindowID
should get computed correctly.

This may be just about setting stateless operators to commitWindowid on
re-launch? aka bug/feature?

Thks
Amol



E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*

www.datatorrent.com  |  apex.apache.org

*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>

On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <pra...@datatorrent.com>
wrote:

> Do we need to save committedWindowId? Can't it be computed from existing
> checkpoints by walking through the DAG. We probably do this anyway and I
> suspect there is a minor bug somewhere in there. If an operator is
> stateless you could assume checkpoint as long max for sake of computation
> and compute the committed window to be the lowest common checkpoint. If
> they are all stateless and you end up with long max you can start with
> window id that reflects the current timestamp.
>
> Thanks
>
> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <a...@datatorrent.com> wrote:
>
> > CommitWindowId could be computed from the existing checkpoints. That
> > solution still needs purge to be done after commitWindowId is confirmed
> to
> > be saved in Stram state. Without ths the commitWindowId computed from the
> > checkpoints may have some checkpoints missing.
> >
> > Thks
> > Amol
> >
> >
> >
> > E:a...@datatorrent.com | M: 510-449-2606 | Twitter: @*amolhkekre*
> >
> > www.datatorrent.com  |  apex.apache.org
> >
> > *Join us at Apex Big Data World-San Jose
> > <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
> > [image: http://www.apexbigdata.com/san-jose-register.html]
> > <http://www.apexbigdata.com/san-jose-register.html>
> >
> > On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <pra...@datatorrent.com
> >
> > wrote:
> >
> > > Can't the commitedWindowId be calculated by looking at the physical
> plan
> > > and the existing checkpoints?
> > >
> > > On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <tus...@apache.org>
> wrote:
> > >
> > > > Help Needed for APEXCORE-619
> > > >
> > > > Issue : When application is relaunched after long time with stateless
> > > > opeartors at the end of the DAG, the stateless operators starts with
> a
> > > very
> > > > high windowId. In this case the stateless operator ignors all the
> data
> > > > received till upstream operator catches up with it. This breaks the
> > > > *at-least-once* gaurantee while relaunch of the opeartor or when
> master
> > > is
> > > > killed and application is restarted.
> > > >
> > > > Solutions:
> > > > - Fix windowId for stateless leaf operators from upstream opeartor.
> But
> > > it
> > > > has some issues when we have a join with two upstrams operators at
> > > > different windowId. If we set the windowID to min(upstream windowId),
> > > then
> > > > we need to again recalulate the new recovery window ids for upstream
> > > paths
> > > > from this operators.
> > > >
> > > > - Other solution is to create a empty file in checkpoint directory
> for
> > > > stateless operators. This will help us to identify the checkpoints of
> > > > stateless operators during relaunch instead of computing from latest
> > > > timestamp.
> > > >
> > > > - Bring the entire DAG to committedWindowId. This could be achived
> > using
> > > > writing committedWindowId in a journal. we need to make sure that we
> > are
> > > > not puring the checkpointed state until the committedWundowId is
> saved
> > in
> > > > journal.
> > > >
> > > > Let me know your thoughs on this and preferred solution.
> > > >
> > > > Regards,
> > > > -Tushar.
> > > >
> > >
> >
>

Reply via email to