This delay operator will act as an input operator for the first window and act as a regular operator after that. The engine will increment the window id of the windows from all the output ports of the delay operator.
We will need a new interface for the delay operator, extending the existing Operator interface. The interface will probably include: - Emitting the tuples for the first window - Emitting the tuples after recovery We will provide a default implementation of the delay operator with a write-ahead log that stores the tuples for the window before each checkpoint for recovery. We will also probably support the number of windows to delay using an operator property. Let's look at this DAG with an iteration loop: upstream --> A --> B --> downstream ^ | |-----| With the delay operator, the physical view of the DAG looks like this with D being the delay operator: upstream --> A --> B --> downstream ^ | |-D<--| There are two approaches for specifying the delay operator. 1) As discussed earlier on this thread, the delay operator can be specified as an *input port attribute* of A. The delay operator D will not appear in the logical DAG. The engine will do the +1 on the window ID based on the presence of the input port attribute. In this case, the delay operator does not need to specify any input port, just like the unifier, with the process(tuple) method implicitly taking in the tuples from the output port of B, which logically connects to the input port of A. 2) The delay operator is specified and connected *as any other operator* in the logical DAG. The engine will do the +1 on the window ID if the operator implements the delay operator interface. In this case, the delay operator D will need to specify at least one input port (just like a regular operator), and can actually have multiple input ports. I'm leaning toward the 2nd approach. Please share your thoughts. Which one you think is better? Or maybe suggest a different approach altogether? Thanks! David David On Wed, Oct 7, 2015 at 10:51 AM, Thomas Weise <tho...@datatorrent.com> wrote: > Why not set the the delay operator as attribute? We already support > partitioners and stream codecs as attribute. > > > On Wed, Oct 7, 2015 at 10:09 AM, Pramod Immaneni <pra...@datatorrent.com> > wrote: > > > How about making just the window delay an attribute on the input port. > The > > operator connection is just like a normal DAG stream creation. We could > > also support connecting same operator to multiple input ports with > > different delay and handle fault recovery accordingly. > > > > On Wed, Oct 7, 2015 at 9:53 AM, David Yan <da...@datatorrent.com> wrote: > > > > > The iteration operator actually resembles the usage of unifiers. We > have > > > getUnifier in the interface of OutputPort. > > > > > > But if we add getDelayOperator in the interface of InputPort, that > would > > > introduce backward incompatibility especially since we can't use the > > > default implementation feature of interfaces that is in Java 8. > > > > > > Putting the class object as an attribute of the InputPort is not good > > > either because you can't configure the delay operator itself. > > > > > > Thoughts? > > > > > > David > > > > > > On Fri, Sep 25, 2015 at 10:10 AM, David Yan <da...@datatorrent.com> > > wrote: > > > > > > > This is a very good idea. This way, we can have a default > > implementation > > > > of that operator and the user can control how the tuples are stored > by > > > > having his/her own implementation. How many windows the operator > > delays > > > is > > > > part of the implementation of that operator. > > > > > > > > I am thinking of getting rid of the ITERATION_WINDOW_OFFSET attribute > > and > > > > introduce a DELAY_OPERATOR_CLASS attribute so that the user can > specify > > > the > > > > delay operator class to be used. > > > > > > > > More thoughts? > > > > > > > > David > > > > > > > > On Thu, Sep 17, 2015 at 7:16 PM, Gaurav Gupta < > gau...@datatorrent.com> > > > > wrote: > > > > > > > >> Hey David, > > > >> > > > >> I was thinking can we add another operator in front of the input > port > > > that > > > >> has ITERATION_WINDOW_COUNT set. The new additional operator will > have > > > >> property whose value will be set equal to ITERATION_WINDOW_COUNT > and > > it > > > >> will be responsible for caching the data for those many windows and > > > >> delaying the data. This operator can act as unifier cum iterator > > > operator. > > > >> For this you may not need any external storage agent as platform > > > >> checkpointing should help you here. > > > >> > > > >> We are doing something similar for Sliding window. > > > >> > > > >> Thanks > > > >> -Gaurav > > > >> > > > >> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> > > > wrote: > > > >> > > > >> > Hi all, > > > >> > > > > >> > One current disadvantage of Apex is the inability to do iterations > > and > > > >> > machine learning algorithms because we don't allow loops in the > > > >> application > > > >> > DAG (hence the name DAG). I am proposing that we allow loops in > the > > > >> DAG if > > > >> > the loop advances the window ID by a configured amount. A JIRA > > ticket > > > >> has > > > >> > been created: > > > >> > > > > >> > https://malhar.atlassian.net/browse/APEX-60 > > > >> > > > > >> > I have started this work in my fork at > > > >> > https://github.com/davidyan74/incubator-apex-core/tree/APEX-60. > > > >> > > > > >> > The current progress is that a simple test case works. Major work > > > still > > > >> > needs to be done with respect to recovery and partitioning. > > > >> > > > > >> > The value ITERATION_WINDOW_COUNT is an attribute to an input port > of > > > an > > > >> > operator. If the value of the attribute is greater than or equal > to > > > 1, > > > >> any > > > >> > tuples sent to the input port are treated to be > > ITERATION_WINDOW_COUNT > > > >> > windows ahead of what they are. > > > >> > > > > >> > For recovery, we will need to checkpoint all the tuples between > > ports > > > >> with > > > >> > the to replay the looped tuples. During the recovery, if the > > operator > > > >> has > > > >> > an input port, with ITERATION_WINDOW_COUNT=2, is recovering from > > > >> checkpoint > > > >> > window 14, the tuples for that input port from window 13 and > window > > 14 > > > >> need > > > >> > to be replayed to be treated as window 15 and window 16 > respectively > > > >> (13+2 > > > >> > and 14+2). > > > >> > > > > >> > In other words, we need to store all the tuples from window with > ID > > > >> > committedWindowId minus ITERATION_WINDOW_COUNT for recovery and > > purge > > > >> the > > > >> > tuples earlier than that window. > > > >> > We can optimize this by only storing the tuples for > > > >> ITERATION_WINDOW_COUNT > > > >> > windows prior to any checkpoint. > > > >> > > > > >> > For that, we need a storage mechanism for the tuples. Chandni > > already > > > >> has > > > >> > something that fits this usage case in Apex Malhar. The class is > > > >> > IdempotentStorageManager. In order for this to be used in Apex > > core, > > > we > > > >> > need to deprecate the class in Apex Malhar and move it to Apex > Core. > > > >> > > > > >> > A JIRA ticket has been created for this particular work: > > > >> > > > > >> > https://malhar.atlassian.net/browse/APEX-128 > > > >> > > > > >> > Some of the above has been discussed among Thomas, Chetan, > Chandni, > > > and > > > >> > myself. > > > >> > > > > >> > For partitioning, we have not started any discussion or > > brainstorming. > > > >> We > > > >> > appreciate any feedback on this and any other aspect related to > > > >> supporting > > > >> > iterations in general. > > > >> > > > > >> > Thanks! > > > >> > > > > >> > David > > > >> > > > > >> > > > > > > > > > > > > > >