Can you please let me know your preference? My preference is for solution
3, by adding a StorageAgent which creates an empty file, and using this
storage agent for leaf stateless operators.

- Tushar.


On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tus...@datatorrent.com>
wrote:

> Thank you all for the feedback.
>
> Some of the useful output operator can be stateless, they push data
> received in a window to output store. for example 
> KafkaOutputOperator/JDBCOutputOperator,
> or the output stores where
> writes are idempotent, which covers most of the key-value stores.
>
> I was going to use the existing logic to compute the committedWindowId
> with addition of few steps explained below.
> solution-1
> - Calculate committedWindow with leaf operator checkpoints set to current
> timestamp (current behaviour)
> - Update leaf operators recoveryWindowId to committedWindowId
> - Calculate committedWindow again, this steps is required because as
> downstream operator recoveryWindowId is reduced and hence we may have to
> adjust the recoveryWindowId of upstream operators.
>
> This will prevent leaf stateless opeartors to start from current
> timestamp, hence reducing amount of data loss. But As per the concern
> raised by Bhupesh about last stateless operator being slow, the solution
> suggested by Vlad is sufficient
>
> solution-1
> - as explained above. If little loss is expected we could go with this
> appraoch.
> solution-2
> - Fail validation if last operator is stateless in AT_LEAST_ONCE scenario
> as suggested by Vlad.
>   This could break backward compatibility as old applications will fail to
> launch.
> solution-3
> - Mark last operator stateful in AT_LEAST_ONCE scenario.
>
> Let me know about your preference.
>
> Regards,
> - Tushar.
>
>
> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v.ro...@datatorrent.com>
> wrote:
>
>> For a long chain of stateless operators at the end of a DAG, it is
>> possible that time to propagate the end window to a leaf operator is
>> greater than the time for a checkpoint to be persisted in HDFS.
>>
>> If at least once processing guarantee is necessary, the leaf operators
>> should not be STATELESS. Will invalidating DAG that has one or more leaf
>> operator marked as STATELESS with AT_LEAST_ONCE processing solve
>> APEXCORE-619? It is not the best solution, but I think it is sufficient for
>> the described scenario.
>>
>> Thank you,
>>
>> Vlad
>>
>>
>> On 3/2/17 08:43, Thomas Weise wrote:
>>
>>> Good point, that's correct for a stateless leaf operator (operator that
>>> does not have downstream operators). The minimum of upstream checkpoints
>>> can be higher than the last windowId seen by the leaf operator. Although
>>> that is a low probability, because it would mean the time it took for the
>>> checkpoint to become visible in HDFS is less than propagation of
>>> endWindow
>>> downstream.
>>>
>>> It's also not a problem for an intermediate stateless operator, because
>>> the
>>> downstream checkpoint will inform the recovery windowId. Most of the time
>>> stateless operators are intermediate.
>>>
>>> Leaf operators are the output operators. I suspect in the original
>>> scenario
>>> is was a console output operator? Useful output operators usually won't
>>> be
>>> stateless, they have to track state to interact with the external system
>>> correctly. I'm bringing this up for adequate cost/benefit analysis.
>>>
>>> In absence of stateful downstream operator, you only have the committed
>>> windowId, which is essentially a checkpointing watermark. On application
>>> restart it has to be recomputed from the checkpoints available, and does
>>> not cover the scenario Tushar reported originally.
>>>
>>> Saving committed windowId comes at a cost, it would have to be written to
>>> the journal before operators are notified. Care has been taken to no
>>> write
>>> unnecessarily to the journal, as it is blocking I/O and in this case the
>>> frequency depends on the order of arrival of checkpoint notifications
>>> from
>>> operators. We also don't want to delay commitedWindow notification, as
>>> that
>>> would introduce latency.
>>>
>>> Thomas
>>>
>>>
>>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <bhup...@datatorrent.com>
>>> wrote:
>>>
>>> What if all operators complete first checkpoints but the stateless
>>>> operator
>>>> could not cross the first checkpoint window, and the DAG crashed.
>>>> If we try to figure out the recovery checkpoint now, we might conclude
>>>> that
>>>> checkpoint 1 is the point to start and we may miss some data getting
>>>> processed by the stateless operator. Probably in this case at-least
>>>> once is
>>>> also not guaranteed?
>>>>
>>>> ~ Bhupesh
>>>>
>>>>
>>>> _______________________________________________________
>>>>
>>>> Bhupesh Chawda
>>>>
>>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>>>>
>>>> www.datatorrent.com  |  apex.apache.org
>>>>
>>>>
>>>>
>>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <t...@apache.org> wrote:
>>>>
>>>> Dummy checkpoints, continuously writing committed window id and the like
>>>>> all introduce overhead that is probably not needed.
>>>>>
>>>>> All the information to derive what we need is likely available and IMO
>>>>>
>>>> the
>>>>
>>>>> discussion should be on what is the correct way of using it. I will
>>>>> have
>>>>>
>>>> a
>>>>
>>>>> look when I get to it as well.
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sand...@datatorrent.com
>>>>> >
>>>>> wrote:
>>>>>
>>>>> Instead of treating the stateless operator in a special way and missing
>>>>>> corner cases, just have a dummy checkpoint, then there is no need to
>>>>>>
>>>>> handle
>>>>>
>>>>>> corner cases.
>>>>>>
>>>>>> There is a name for this solution,
>>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <
>>>>>> pra...@datatorrent.com
>>>>>> wrote:
>>>>>>
>>>>>> There is code in various places that deals with stateless operators
>>>>>>>
>>>>>> in
>>>>
>>>>> a
>>>>>
>>>>>> special way even though a physical checkpoint does not exist on the
>>>>>>>
>>>>>> disk.
>>>>>
>>>>>> It is probably a matter of applying similar thought process/logic
>>>>>>>
>>>>>> correctly
>>>>>>
>>>>>>> here.
>>>>>>>
>>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <a...@datatorrent.com>
>>>>>>>
>>>>>> wrote:
>>>>>
>>>>>> hmm! the fact that commitWindowId has moved up (right now in memory
>>>>>>>>
>>>>>>> of
>>>>>
>>>>>> Stram) should mean that a complete set of checkpoints are
>>>>>>>>
>>>>>>> available,
>>>>
>>>>> i.e
>>>>>>
>>>>>>> commitWindowId can be derived. Lets say that next checkpoint window
>>>>>>>>
>>>>>>> also
>>>>>>
>>>>>>> gets checkpointed across the app, commitwindowID is in memory but
>>>>>>>>
>>>>>>> not
>>>>
>>>>> written to stram-state yet, then upon relaunch the latest
>>>>>>>>
>>>>>>> commitwindowID
>>>>>>
>>>>>>> should get computed correctly.
>>>>>>>>
>>>>>>>> This may be just about setting stateless operators to
>>>>>>>>
>>>>>>> commitWindowid
>>>>
>>>>> on
>>>>>
>>>>>> re-launch? aka bug/feature?
>>>>>>>>
>>>>>>>> Thks
>>>>>>>> Amol
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>>>>>
>>>>>>> Twitter:
>>>>>
>>>>>> @*amolhkekre*
>>>>>>>
>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>
>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>
>>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>>>>>>>>
>>>>>>> pra...@datatorrent.com>
>>>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Do we need to save committedWindowId? Can't it be computed from
>>>>>>>>>
>>>>>>>> existing
>>>>>>>
>>>>>>>> checkpoints by walking through the DAG. We probably do this
>>>>>>>>>
>>>>>>>> anyway
>>>>
>>>>> and
>>>>>>
>>>>>>> I
>>>>>>>
>>>>>>>> suspect there is a minor bug somewhere in there. If an operator
>>>>>>>>>
>>>>>>>> is
>>>>
>>>>> stateless you could assume checkpoint as long max for sake of
>>>>>>>>>
>>>>>>>> computation
>>>>>>>
>>>>>>>> and compute the committed window to be the lowest common
>>>>>>>>>
>>>>>>>> checkpoint.
>>>>>
>>>>>> If
>>>>>>
>>>>>>> they are all stateless and you end up with long max you can start
>>>>>>>>>
>>>>>>>> with
>>>>>>
>>>>>>> window id that reflects the current timestamp.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <a...@datatorrent.com
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>
>>>>>>>> CommitWindowId could be computed from the existing checkpoints.
>>>>>>>>>>
>>>>>>>>> That
>>>>>>
>>>>>>> solution still needs purge to be done after commitWindowId is
>>>>>>>>>>
>>>>>>>>> confirmed
>>>>>>>
>>>>>>>> to
>>>>>>>>>
>>>>>>>>>> be saved in Stram state. Without ths the commitWindowId
>>>>>>>>>>
>>>>>>>>> computed
>>>>
>>>>> from
>>>>>>
>>>>>>> the
>>>>>>>>
>>>>>>>>> checkpoints may have some checkpoints missing.
>>>>>>>>>>
>>>>>>>>>> Thks
>>>>>>>>>> Amol
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
>>>>>>>>>>
>>>>>>>>> Twitter: @*amolhkekre*
>>>>>>>
>>>>>>>> www.datatorrent.com  |  apex.apache.org
>>>>>>>>>>
>>>>>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>>>>>>>>>>
>>>>>>>>> pra...@datatorrent.com
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Can't the commitedWindowId be calculated by looking at the
>>>>>>>>>>>
>>>>>>>>>> physical
>>>>>>
>>>>>>> plan
>>>>>>>>>
>>>>>>>>>> and the existing checkpoints?
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>>>>>>>>>>>
>>>>>>>>>> tus...@apache.org
>>>>>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Help Needed for APEXCORE-619
>>>>>>>>>>>>
>>>>>>>>>>>> Issue : When application is relaunched after long time with
>>>>>>>>>>>>
>>>>>>>>>>> stateless
>>>>>>>>
>>>>>>>>> opeartors at the end of the DAG, the stateless operators
>>>>>>>>>>>>
>>>>>>>>>>> starts
>>>>>
>>>>>> with
>>>>>>>>
>>>>>>>>> a
>>>>>>>>>
>>>>>>>>>> very
>>>>>>>>>>>
>>>>>>>>>>>> high windowId. In this case the stateless operator ignors
>>>>>>>>>>>>
>>>>>>>>>>> all
>>>>
>>>>> the
>>>>>>
>>>>>>> data
>>>>>>>>>
>>>>>>>>>> received till upstream operator catches up with it. This
>>>>>>>>>>>>
>>>>>>>>>>> breaks
>>>>>
>>>>>> the
>>>>>>>
>>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or
>>>>>>>>>>>>
>>>>>>>>>>> when
>>>>>>
>>>>>>> master
>>>>>>>>>
>>>>>>>>>> is
>>>>>>>>>>>
>>>>>>>>>>>> killed and application is restarted.
>>>>>>>>>>>>
>>>>>>>>>>>> Solutions:
>>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream
>>>>>>>>>>>>
>>>>>>>>>>> opeartor.
>>>>>>>
>>>>>>>> But
>>>>>>>>>
>>>>>>>>>> it
>>>>>>>>>>>
>>>>>>>>>>>> has some issues when we have a join with two upstrams
>>>>>>>>>>>>
>>>>>>>>>>> operators
>>>>>
>>>>>> at
>>>>>>>
>>>>>>>> different windowId. If we set the windowID to min(upstream
>>>>>>>>>>>>
>>>>>>>>>>> windowId),
>>>>>>>>
>>>>>>>>> then
>>>>>>>>>>>
>>>>>>>>>>>> we need to again recalulate the new recovery window ids for
>>>>>>>>>>>>
>>>>>>>>>>> upstream
>>>>>>>>
>>>>>>>>> paths
>>>>>>>>>>>
>>>>>>>>>>>> from this operators.
>>>>>>>>>>>>
>>>>>>>>>>>> - Other solution is to create a empty file in checkpoint
>>>>>>>>>>>>
>>>>>>>>>>> directory
>>>>>>>
>>>>>>>> for
>>>>>>>>>
>>>>>>>>>> stateless operators. This will help us to identify the
>>>>>>>>>>>>
>>>>>>>>>>> checkpoints
>>>>>>>
>>>>>>>> of
>>>>>>>>
>>>>>>>>> stateless operators during relaunch instead of computing
>>>>>>>>>>>>
>>>>>>>>>>> from
>>>>
>>>>> latest
>>>>>>>>
>>>>>>>>> timestamp.
>>>>>>>>>>>>
>>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be
>>>>>>>>>>>>
>>>>>>>>>>> achived
>>>>>>>
>>>>>>>> using
>>>>>>>>>>
>>>>>>>>>>> writing committedWindowId in a journal. we need to make
>>>>>>>>>>>>
>>>>>>>>>>> sure
>>>>
>>>>> that
>>>>>>
>>>>>>> we
>>>>>>>>
>>>>>>>>> are
>>>>>>>>>>
>>>>>>>>>>> not puring the checkpointed state until the
>>>>>>>>>>>>
>>>>>>>>>>> committedWundowId
>>>>
>>>>> is
>>>>>>
>>>>>>> saved
>>>>>>>>>
>>>>>>>>>> in
>>>>>>>>>>
>>>>>>>>>>> journal.
>>>>>>>>>>>>
>>>>>>>>>>>> Let me know your thoughs on this and preferred solution.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> -Tushar.
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>> *Join us at Apex Big Data World-San Jose
>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>>>>>
>>>>>>
>>
>

Reply via email to