Some more thoughts.

We would need a way to know that the data for a particular window needs to
be saved for replay in case of failure, namely checkpoint-iteration to
checkpoint the ones you mentioned, so we can save the data for only those
windows. The engine could provide the information in some manner. A related
ticket is https://malhar.atlassian.net/browse/APEX-78

There may be advantages to storing data on the source side (loop
originator) like single source of truth when the same source is connected
to multiple operators (loop destinations) but it could introduce
complications in implementation since delaying of data now needs to happen
on the source side and if you have same stream going to a loop destination
as well as a non loop destination the same data needs to be sent out in two
different windows. Anyone have any other thoughts on this.

For partitioning it seems like it should work correctly if the operators in
the loop path should all be restored to the same checkpoint including loop
destination and loop source.

You may have already thought about the changes for idempotent storage
manager such as being able to progressively save data.

Thanks


On Wed, Sep 16, 2015 at 2:22 PM, Pramod Immaneni <pra...@datatorrent.com>
wrote:

> This is a good idea. One thing I can think of is you could save the tuples
> on the side of the operator that is the loop originator (downstream
> operator) itself for the [checkpoint-iteration..checkpoint] windows because
> they are going to be in persistence storage like HDFS and can be accessed
> by the loop destination (upstream operator) during recovery. The advantage
> of this is you don't have to do a special save on the destination side but
> rather piggyback on the HDFS save features of the bufferserver itself.
>
> On Wed, Sep 16, 2015 at 1:44 PM, David Yan <da...@datatorrent.com> wrote:
>
>> Hi all,
>>
>> One current disadvantage of Apex is the inability to do iterations and
>> machine learning algorithms because we don't allow loops in the
>> application
>> DAG (hence the name DAG).  I am proposing that we allow loops in the DAG
>> if
>> the loop advances the window ID by a configured amount.  A JIRA ticket has
>> been created:
>>
>> https://malhar.atlassian.net/browse/APEX-60
>>
>> I have started this work in my fork at
>> https://github.com/davidyan74/incubator-apex-core/tree/APEX-60.
>>
>> The current progress is that a simple test case works.  Major work still
>> needs to be done with respect to recovery and partitioning.
>>
>> The value ITERATION_WINDOW_COUNT is an attribute to an input port of an
>> operator.  If the value of the attribute is greater than or equal to 1,
>> any
>> tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT
>> windows ahead of what they are.
>>
>> For recovery, we will need to checkpoint all the tuples between ports with
>> the to replay the looped tuples.  During the recovery, if the operator has
>> an input port, with ITERATION_WINDOW_COUNT=2, is recovering from
>> checkpoint
>> window 14, the tuples for that input port from window 13 and window 14
>> need
>> to be replayed to be treated as window 15 and window 16 respectively (13+2
>> and 14+2).
>>
>> In other words, we need to store all the tuples from window with ID
>> committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the
>> tuples earlier than that window.
>> We can optimize this by only storing the tuples for ITERATION_WINDOW_COUNT
>> windows prior to any checkpoint.
>>
>> For that, we need a storage mechanism for the tuples.  Chandni already has
>> something that fits this usage case in Apex Malhar.  The class is
>> IdempotentStorageManager.  In order for this to be used in Apex core, we
>> need to deprecate the class in Apex Malhar and move it to Apex Core.
>>
>> A JIRA ticket has been created for this particular work:
>>
>> https://malhar.atlassian.net/browse/APEX-128
>>
>> Some of the above has been discussed among Thomas, Chetan, Chandni, and
>> myself.
>>
>> For partitioning, we have not started any discussion or brainstorming.  We
>> appreciate any feedback on this and any other aspect related to supporting
>> iterations in general.
>>
>> Thanks!
>>
>> David
>>
>
>

Reply via email to