This is a very good idea.  This way, we can have a default implementation
of that operator and the user can control how the tuples are stored by
having his/her own implementation.  How many windows the operator delays is
part of the implementation of that operator.

I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute and
introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify the
delay operator class to be used.

More thoughts?

David

On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <gau...@datatorrent.com>
wrote:

> Hey David,
>
> I was thinking can we add another operator in front of the input port that
> has ITERATION_WINDOW_COUNT set. The new additional operator will have
> property whose value  will be set equal to ITERATION_WINDOW_COUNT and it
> will be responsible for caching the data for those many windows and
> delaying the data. This operator can act as unifier cum iterator operator.
> For this you may not need any external storage agent as platform
> checkpointing should help you here.
>
> We are doing something similar for Sliding window.
>
> Thanks
> -Gaurav
>
> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi all,
> >
> > One current disadvantage of Apex is the inability to do iterations and
> > machine learning algorithms because we don't allow loops in the
> application
> > DAG (hence the name DAG).  I am proposing that we allow loops in the DAG
> if
> > the loop advances the window ID by a configured amount.  A JIRA ticket
> has
> > been created:
> >
> > https://malhar.atlassian.net/browse/APEX-60
> >
> > I have started this work in my fork at
> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> >
> > The current progress is that a simple test case works.  Major work still
> > needs to be done with respect to recovery and partitioning.
> >
> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
> > operator.  If the value of the attribute is greater than or equal to 1,
> any
> > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
> > windows ahead of what they are.
> >
> > For recovery, we will need to checkpoint all the tuples between ports
> with
> > the to replay the looped tuples.  During the recovery, if the operator
> has
> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> checkpoint
> > window 14, the tuples for that input port from window 13 and window 14
> need
> > to be replayed to be treated as window 15 and window 16 respectively
> (13+2
> > and 14+2).
> >
> > In other words, we need to store all the tuples from window with ID
> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
> > tuples earlier than that window.
> > We can optimize this by only storing the tuples for
> ITERATION_WINDOW_COUNT
> > windows prior to any checkpoint.
> >
> > For that, we need a storage mechanism for the tuples.  Chandni already
> has
> > something that fits this usage case in Apex Malhar.  The class is
> > IdempotentStorageManager.  In order for this to be used in Apex core, we
> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> >
> > A JIRA ticket has been created for this particular work:
> >
> > https://malhar.atlassian.net/browse/APEX-128
> >
> > Some of the above has been discussed among Thomas, Chetan, Chandni, and
> > myself.
> >
> > For partitioning, we have not started any discussion or brainstorming.
> We
> > appreciate any feedback on this and any other aspect related to
> supporting
> > iterations in general.
> >
> > Thanks!
> >
> > David
> >
>

Reply via email to