Why not set the the delay operator as attribute? We already support partitioners and stream codecs as attribute.
On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pra...@datatorrent.com> wrote: > How about making just the window delay an attribute on the input port. The > operator connection is just like a normal DAG stream creation. We could > also support connecting same operator to multiple input ports with > different delay and handle fault recovery accordingly. > > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote: > > > The iteration operator actually resembles the usage of unifiers. We have > > getUnifier in the interface of OutputPort. > > > > But if we add getDelayOperator in the interface of InputPort, that would > > introduce backward incompatibility especially since we can't use the > > default implementation feature of interfaces that is in Java 8. > > > > Putting the class object as an attribute of the InputPort is not good > > either because you can't configure the delay operator itself. > > > > Thoughts? > > > > David > > > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com> > wrote: > > > > > This is a very good idea. This way, we can have a default > implementation > > > of that operator and the user can control how the tuples are stored by > > > having his/her own implementation. How many windows the operator > delays > > is > > > part of the implementation of that operator. > > > > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute > and > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify > > the > > > delay operator class to be used. > > > > > > More thoughts? > > > > > > David > > > > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <gau...@datatorrent.com> > > > wrote: > > > > > >> Hey David, > > >> > > >> I was thinking can we add another operator in front of the input port > > that > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will have > > >> property whose value will be set equal to ITERATION_WINDOW_COUNT and > it > > >> will be responsible for caching the data for those many windows and > > >> delaying the data. This operator can act as unifier cum iterator > > operator. > > >> For this you may not need any external storage agent as platform > > >> checkpointing should help you here. > > >> > > >> We are doing something similar for Sliding window. > > >> > > >> Thanks > > >> -Gaurav > > >> > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> > > wrote: > > >> > > >> > Hi all, > > >> > > > >> > One current disadvantage of Apex is the inability to do iterations > and > > >> > machine learning algorithms because we don't allow loops in the > > >> application > > >> > DAG (hence the name DAG). I am proposing that we allow loops in the > > >> DAG if > > >> > the loop advances the window ID by a configured amount. A JIRA > ticket > > >> has > > >> > been created: > > >> > > > >> > https://malhar.atlassian.net/browse/APEX-60 > > >> > > > >> > I have started this work in my fork at > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60. > > >> > > > >> > The current progress is that a simple test case works. Major work > > still > > >> > needs to be done with respect to recovery and partitioning. > > >> > > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port of > > an > > >> > operator. If the value of the attribute is greater than or equal to > > 1, > > >> any > > >> > tuples sent to the input port are treated to be > ITERATION_WINDOW_COUNT > > >> > windows ahead of what they are. > > >> > > > >> > For recovery, we will need to checkpoint all the tuples between > ports > > >> with > > >> > the to replay the looped tuples. During the recovery, if the > operator > > >> has > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from > > >> checkpoint > > >> > window 14, the tuples for that input port from window 13 and window > 14 > > >> need > > >> > to be replayed to be treated as window 15 and window 16 respectively > > >> (13+2 > > >> > and 14+2). > > >> > > > >> > In other words, we need to store all the tuples from window with ID > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and > purge > > >> the > > >> > tuples earlier than that window. > > >> > We can optimize this by only storing the tuples for > > >> ITERATION_WINDOW_COUNT > > >> > windows prior to any checkpoint. > > >> > > > >> > For that, we need a storage mechanism for the tuples. Chandni > already > > >> has > > >> > something that fits this usage case in Apex Malhar. The class is > > >> > IdempotentStorageManager. In order for this to be used in Apex > core, > > we > > >> > need to deprecate the class in Apex Malhar and move it to Apex Core. > > >> > > > >> > A JIRA ticket has been created for this particular work: > > >> > > > >> > https://malhar.atlassian.net/browse/APEX-128 > > >> > > > >> > Some of the above has been discussed among Thomas, Chetan, Chandni, > > and > > >> > myself. > > >> > > > >> > For partitioning, we have not started any discussion or > brainstorming. > > >> We > > >> > appreciate any feedback on this and any other aspect related to > > >> supporting > > >> > iterations in general. > > >> > > > >> > Thanks! > > >> > > > >> > David > > >> > > > >> > > > > > > > > >