Why not set the the delay operator as attribute? We already support
partitioners and stream codecs as attribute.


On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pra...@datatorrent.com>
wrote:

> How about making just the window delay an attribute on the input port. The
> operator connection is just like a normal DAG stream creation. We could
> also support connecting same operator to multiple input ports with
> different delay and handle fault recovery accordingly.
>
> On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote:
>
> > The iteration operator actually resembles the usage of unifiers.  We have
> > getUnifier in the interface of OutputPort.
> >
> > But if we add getDelayOperator in the interface of InputPort, that would
> > introduce backward incompatibility especially since we can't use the
> > default implementation feature of interfaces that is in Java 8.
> >
> > Putting the class object as an attribute of the InputPort is not good
> > either because you can't configure the delay operator itself.
> >
> > Thoughts?
> >
> > David
> >
> > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > This is a very good idea.  This way, we can have a default
> implementation
> > > of that operator and the user can control how the tuples are stored by
> > > having his/her own implementation.  How many windows the operator
> delays
> > is
> > > part of the implementation of that operator.
> > >
> > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute
> and
> > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify
> > the
> > > delay operator class to be used.
> > >
> > > More thoughts?
> > >
> > > David
> > >
> > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <gau...@datatorrent.com>
> > > wrote:
> > >
> > >> Hey David,
> > >>
> > >> I was thinking can we add another operator in front of the input port
> > that
> > >> has ITERATION_WINDOW_COUNT set. The new additional operator will have
> > >> property whose value  will be set equal to ITERATION_WINDOW_COUNT and
> it
> > >> will be responsible for caching the data for those many windows and
> > >> delaying the data. This operator can act as unifier cum iterator
> > operator.
> > >> For this you may not need any external storage agent as platform
> > >> checkpointing should help you here.
> > >>
> > >> We are doing something similar for Sliding window.
> > >>
> > >> Thanks
> > >> -Gaurav
> > >>
> > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > One current disadvantage of Apex is the inability to do iterations
> and
> > >> > machine learning algorithms because we don't allow loops in the
> > >> application
> > >> > DAG (hence the name DAG).  I am proposing that we allow loops in the
> > >> DAG if
> > >> > the loop advances the window ID by a configured amount.  A JIRA
> ticket
> > >> has
> > >> > been created:
> > >> >
> > >> > https://malhar.atlassian.net/browse/APEX-60
> > >> >
> > >> > I have started this work in my fork at
> > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
> > >> >
> > >> > The current progress is that a simple test case works.  Major work
> > still
> > >> > needs to be done with respect to recovery and partitioning.
> > >> >
> > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of
> > an
> > >> > operator.  If the value of the attribute is greater than or equal to
> > 1,
> > >> any
> > >> > tuples sent to the input port are treated to be
> ITERATION_WINDOW_COUNT
> > >> > windows ahead of what they are.
> > >> >
> > >> > For recovery, we will need to checkpoint all the tuples between
> ports
> > >> with
> > >> > the to replay the looped tuples.  During the recovery, if the
> operator
> > >> has
> > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
> > >> checkpoint
> > >> > window 14, the tuples for that input port from window 13 and window
> 14
> > >> need
> > >> > to be replayed to be treated as window 15 and window 16 respectively
> > >> (13+2
> > >> > and 14+2).
> > >> >
> > >> > In other words, we need to store all the tuples from window with ID
> > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and
> purge
> > >> the
> > >> > tuples earlier than that window.
> > >> > We can optimize this by only storing the tuples for
> > >> ITERATION_WINDOW_COUNT
> > >> > windows prior to any checkpoint.
> > >> >
> > >> > For that, we need a storage mechanism for the tuples.  Chandni
> already
> > >> has
> > >> > something that fits this usage case in Apex Malhar.  The class is
> > >> > IdempotentStorageManager.  In order for this to be used in Apex
> core,
> > we
> > >> > need to deprecate the class in Apex Malhar and move it to Apex Core.
> > >> >
> > >> > A JIRA ticket has been created for this particular work:
> > >> >
> > >> > https://malhar.atlassian.net/browse/APEX-128
> > >> >
> > >> > Some of the above has been discussed among Thomas, Chetan, Chandni,
> > and
> > >> > myself.
> > >> >
> > >> > For partitioning, we have not started any discussion or
> brainstorming.
> > >> We
> > >> > appreciate any feedback on this and any other aspect related to
> > >> supporting
> > >> > iterations in general.
> > >> >
> > >> > Thanks!
> > >> >
> > >> > David
> > >> >
> > >>
> > >
> > >
> >
>

Reply via email to