This is a very good idea. This way, we can have a default implementation of that operator and the user can control how the tuples are stored by having his/her own implementation. How many windows the operator delays is part of the implementation of that operator.
I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute and introduce a DELAY_OPERATOR_CLASS attribute so that the user can specify the delay operator class to be used. More thoughts? David On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta <gau...@datatorrent.com> wrote: > Hey David, > > I was thinking can we add another operator in front of the input port that > has ITERATION_WINDOW_COUNT set. The new additional operator will have > property whose value will be set equal to ITERATION_WINDOW_COUNT and it > will be responsible for caching the data for those many windows and > delaying the data. This operator can act as unifier cum iterator operator. > For this you may not need any external storage agent as platform > checkpointing should help you here. > > We are doing something similar for Sliding window. > > Thanks > -Gaurav > > On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote: > > > Hi all, > > > > One current disadvantage of Apex is the inability to do iterations and > > machine learning algorithms because we don't allow loops in the > application > > DAG (hence the name DAG). I am proposing that we allow loops in the DAG > if > > the loop advances the window ID by a configured amount. A JIRA ticket > has > > been created: > > > > https://malhar.atlassian.net/browse/APEX-60 > > > > I have started this work in my fork at > > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60. > > > > The current progress is that a simple test case works. Major work still > > needs to be done with respect to recovery and partitioning. > > > > The value ITERATION_WINDOW_COUNT is an attribute to an input port of an > > operator. If the value of the attribute is greater than or equal to 1, > any > > tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT > > windows ahead of what they are. > > > > For recovery, we will need to checkpoint all the tuples between ports > with > > the to replay the looped tuples. During the recovery, if the operator > has > > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from > checkpoint > > window 14, the tuples for that input port from window 13 and window 14 > need > > to be replayed to be treated as window 15 and window 16 respectively > (13+2 > > and 14+2). > > > > In other words, we need to store all the tuples from window with ID > > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the > > tuples earlier than that window. > > We can optimize this by only storing the tuples for > ITERATION_WINDOW_COUNT > > windows prior to any checkpoint. > > > > For that, we need a storage mechanism for the tuples. Chandni already > has > > something that fits this usage case in Apex Malhar. The class is > > IdempotentStorageManager. In order for this to be used in Apex core, we > > need to deprecate the class in Apex Malhar and move it to Apex Core. > > > > A JIRA ticket has been created for this particular work: > > > > https://malhar.atlassian.net/browse/APEX-128 > > > > Some of the above has been discussed among Thomas, Chetan, Chandni, and > > myself. > > > > For partitioning, we have not started any discussion or brainstorming. > We > > appreciate any feedback on this and any other aspect related to > supporting > > iterations in general. > > > > Thanks! > > > > David > > >