Can you please let me know your preference? My preference is for solution 3, by adding a StorageAgent which creates an empty file, and using this storage agent for leaf stateless operators.
- Tushar. On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tus...@datatorrent.com> wrote: > Thank you all for the feedback. > > Some of the useful output operator can be stateless, they push data > received in a window to output store. for example > KafkaOutputOperator/JDBCOutputOperator, > or the output stores where > writes are idempotent, which covers most of the key-value stores. > > I was going to use the existing logic to compute the committedWindowId > with addition of few steps explained below. > solution-1 > - Calculate committedWindow with leaf operator checkpoints set to current > timestamp (current behaviour) > - Update leaf operators recoveryWindowId to committedWindowId > - Calculate committedWindow again, this steps is required because as > downstream operator recoveryWindowId is reduced and hence we may have to > adjust the recoveryWindowId of upstream operators. > > This will prevent leaf stateless opeartors to start from current > timestamp, hence reducing amount of data loss. But As per the concern > raised by Bhupesh about last stateless operator being slow, the solution > suggested by Vlad is sufficient > > solution-1 > - as explained above. If little loss is expected we could go with this > appraoch. > solution-2 > - Fail validation if last operator is stateless in AT_LEAST_ONCE scenario > as suggested by Vlad. > This could break backward compatibility as old applications will fail to > launch. > solution-3 > - Mark last operator stateful in AT_LEAST_ONCE scenario. > > Let me know about your preference. > > Regards, > - Tushar. > > > On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v.ro...@datatorrent.com> > wrote: > >> For a long chain of stateless operators at the end of a DAG, it is >> possible that time to propagate the end window to a leaf operator is >> greater than the time for a checkpoint to be persisted in HDFS. >> >> If at least once processing guarantee is necessary, the leaf operators >> should not be STATELESS. Will invalidating DAG that has one or more leaf >> operator marked as STATELESS with AT_LEAST_ONCE processing solve >> APEXCORE-619? It is not the best solution, but I think it is sufficient for >> the described scenario. >> >> Thank you, >> >> Vlad >> >> >> On 3/2/17 08:43, Thomas Weise wrote: >> >>> Good point, that's correct for a stateless leaf operator (operator that >>> does not have downstream operators). The minimum of upstream checkpoints >>> can be higher than the last windowId seen by the leaf operator. Although >>> that is a low probability, because it would mean the time it took for the >>> checkpoint to become visible in HDFS is less than propagation of >>> endWindow >>> downstream. >>> >>> It's also not a problem for an intermediate stateless operator, because >>> the >>> downstream checkpoint will inform the recovery windowId. Most of the time >>> stateless operators are intermediate. >>> >>> Leaf operators are the output operators. I suspect in the original >>> scenario >>> is was a console output operator? Useful output operators usually won't >>> be >>> stateless, they have to track state to interact with the external system >>> correctly. I'm bringing this up for adequate cost/benefit analysis. >>> >>> In absence of stateful downstream operator, you only have the committed >>> windowId, which is essentially a checkpointing watermark. On application >>> restart it has to be recomputed from the checkpoints available, and does >>> not cover the scenario Tushar reported originally. >>> >>> Saving committed windowId comes at a cost, it would have to be written to >>> the journal before operators are notified. Care has been taken to no >>> write >>> unnecessarily to the journal, as it is blocking I/O and in this case the >>> frequency depends on the order of arrival of checkpoint notifications >>> from >>> operators. We also don't want to delay commitedWindow notification, as >>> that >>> would introduce latency. >>> >>> Thomas >>> >>> >>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <bhup...@datatorrent.com> >>> wrote: >>> >>> What if all operators complete first checkpoints but the stateless >>>> operator >>>> could not cross the first checkpoint window, and the DAG crashed. >>>> If we try to figure out the recovery checkpoint now, we might conclude >>>> that >>>> checkpoint 1 is the point to start and we may miss some data getting >>>> processed by the stateless operator. Probably in this case at-least >>>> once is >>>> also not guaranteed? >>>> >>>> ~ Bhupesh >>>> >>>> >>>> _______________________________________________________ >>>> >>>> Bhupesh Chawda >>>> >>>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >>>> >>>> www.datatorrent.com | apex.apache.org >>>> >>>> >>>> >>>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <t...@apache.org> wrote: >>>> >>>> Dummy checkpoints, continuously writing committed window id and the like >>>>> all introduce overhead that is probably not needed. >>>>> >>>>> All the information to derive what we need is likely available and IMO >>>>> >>>> the >>>> >>>>> discussion should be on what is the correct way of using it. I will >>>>> have >>>>> >>>> a >>>> >>>>> look when I get to it as well. >>>>> >>>>> Thanks, >>>>> Thomas >>>>> >>>>> >>>>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sand...@datatorrent.com >>>>> > >>>>> wrote: >>>>> >>>>> Instead of treating the stateless operator in a special way and missing >>>>>> corner cases, just have a dummy checkpoint, then there is no need to >>>>>> >>>>> handle >>>>> >>>>>> corner cases. >>>>>> >>>>>> There is a name for this solution, >>>>>> https://en.wikipedia.org/wiki/Null_Object_pattern >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni < >>>>>> pra...@datatorrent.com >>>>>> wrote: >>>>>> >>>>>> There is code in various places that deals with stateless operators >>>>>>> >>>>>> in >>>> >>>>> a >>>>> >>>>>> special way even though a physical checkpoint does not exist on the >>>>>>> >>>>>> disk. >>>>> >>>>>> It is probably a matter of applying similar thought process/logic >>>>>>> >>>>>> correctly >>>>>> >>>>>>> here. >>>>>>> >>>>>>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <a...@datatorrent.com> >>>>>>> >>>>>> wrote: >>>>> >>>>>> hmm! the fact that commitWindowId has moved up (right now in memory >>>>>>>> >>>>>>> of >>>>> >>>>>> Stram) should mean that a complete set of checkpoints are >>>>>>>> >>>>>>> available, >>>> >>>>> i.e >>>>>> >>>>>>> commitWindowId can be derived. Lets say that next checkpoint window >>>>>>>> >>>>>>> also >>>>>> >>>>>>> gets checkpointed across the app, commitwindowID is in memory but >>>>>>>> >>>>>>> not >>>> >>>>> written to stram-state yet, then upon relaunch the latest >>>>>>>> >>>>>>> commitwindowID >>>>>> >>>>>>> should get computed correctly. >>>>>>>> >>>>>>>> This may be just about setting stateless operators to >>>>>>>> >>>>>>> commitWindowid >>>> >>>>> on >>>>> >>>>>> re-launch? aka bug/feature? >>>>>>>> >>>>>>>> Thks >>>>>>>> Amol >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | >>>>>>>> >>>>>>> Twitter: >>>>> >>>>>> @*amolhkekre* >>>>>>> >>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>> >>>>>>>> *Join us at Apex Big Data World-San Jose >>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* >>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html] >>>>>>>> <http://www.apexbigdata.com/san-jose-register.html> >>>>>>>> >>>>>>>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni < >>>>>>>> >>>>>>> pra...@datatorrent.com> >>>>>> >>>>>>> wrote: >>>>>>>> >>>>>>>> Do we need to save committedWindowId? Can't it be computed from >>>>>>>>> >>>>>>>> existing >>>>>>> >>>>>>>> checkpoints by walking through the DAG. We probably do this >>>>>>>>> >>>>>>>> anyway >>>> >>>>> and >>>>>> >>>>>>> I >>>>>>> >>>>>>>> suspect there is a minor bug somewhere in there. If an operator >>>>>>>>> >>>>>>>> is >>>> >>>>> stateless you could assume checkpoint as long max for sake of >>>>>>>>> >>>>>>>> computation >>>>>>> >>>>>>>> and compute the committed window to be the lowest common >>>>>>>>> >>>>>>>> checkpoint. >>>>> >>>>>> If >>>>>> >>>>>>> they are all stateless and you end up with long max you can start >>>>>>>>> >>>>>>>> with >>>>>> >>>>>>> window id that reflects the current timestamp. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <a...@datatorrent.com >>>>>>>>> >>>>>>>> wrote: >>>>>>> >>>>>>>> CommitWindowId could be computed from the existing checkpoints. >>>>>>>>>> >>>>>>>>> That >>>>>> >>>>>>> solution still needs purge to be done after commitWindowId is >>>>>>>>>> >>>>>>>>> confirmed >>>>>>> >>>>>>>> to >>>>>>>>> >>>>>>>>>> be saved in Stram state. Without ths the commitWindowId >>>>>>>>>> >>>>>>>>> computed >>>> >>>>> from >>>>>> >>>>>>> the >>>>>>>> >>>>>>>>> checkpoints may have some checkpoints missing. >>>>>>>>>> >>>>>>>>>> Thks >>>>>>>>>> Amol >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> | >>>>>>>>>> >>>>>>>>> Twitter: @*amolhkekre* >>>>>>> >>>>>>>> www.datatorrent.com | apex.apache.org >>>>>>>>>> >>>>>>>>>> *Join us at Apex Big Data World-San Jose >>>>>>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* >>>>>>>>>> [image: http://www.apexbigdata.com/san-jose-register.html] >>>>>>>>>> <http://www.apexbigdata.com/san-jose-register.html> >>>>>>>>>> >>>>>>>>>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni < >>>>>>>>>> >>>>>>>>> pra...@datatorrent.com >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Can't the commitedWindowId be calculated by looking at the >>>>>>>>>>> >>>>>>>>>> physical >>>>>> >>>>>>> plan >>>>>>>>> >>>>>>>>>> and the existing checkpoints? >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi < >>>>>>>>>>> >>>>>>>>>> tus...@apache.org >>>>> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Help Needed for APEXCORE-619 >>>>>>>>>>>> >>>>>>>>>>>> Issue : When application is relaunched after long time with >>>>>>>>>>>> >>>>>>>>>>> stateless >>>>>>>> >>>>>>>>> opeartors at the end of the DAG, the stateless operators >>>>>>>>>>>> >>>>>>>>>>> starts >>>>> >>>>>> with >>>>>>>> >>>>>>>>> a >>>>>>>>> >>>>>>>>>> very >>>>>>>>>>> >>>>>>>>>>>> high windowId. In this case the stateless operator ignors >>>>>>>>>>>> >>>>>>>>>>> all >>>> >>>>> the >>>>>> >>>>>>> data >>>>>>>>> >>>>>>>>>> received till upstream operator catches up with it. This >>>>>>>>>>>> >>>>>>>>>>> breaks >>>>> >>>>>> the >>>>>>> >>>>>>>> *at-least-once* gaurantee while relaunch of the opeartor or >>>>>>>>>>>> >>>>>>>>>>> when >>>>>> >>>>>>> master >>>>>>>>> >>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>>>> killed and application is restarted. >>>>>>>>>>>> >>>>>>>>>>>> Solutions: >>>>>>>>>>>> - Fix windowId for stateless leaf operators from upstream >>>>>>>>>>>> >>>>>>>>>>> opeartor. >>>>>>> >>>>>>>> But >>>>>>>>> >>>>>>>>>> it >>>>>>>>>>> >>>>>>>>>>>> has some issues when we have a join with two upstrams >>>>>>>>>>>> >>>>>>>>>>> operators >>>>> >>>>>> at >>>>>>> >>>>>>>> different windowId. If we set the windowID to min(upstream >>>>>>>>>>>> >>>>>>>>>>> windowId), >>>>>>>> >>>>>>>>> then >>>>>>>>>>> >>>>>>>>>>>> we need to again recalulate the new recovery window ids for >>>>>>>>>>>> >>>>>>>>>>> upstream >>>>>>>> >>>>>>>>> paths >>>>>>>>>>> >>>>>>>>>>>> from this operators. >>>>>>>>>>>> >>>>>>>>>>>> - Other solution is to create a empty file in checkpoint >>>>>>>>>>>> >>>>>>>>>>> directory >>>>>>> >>>>>>>> for >>>>>>>>> >>>>>>>>>> stateless operators. This will help us to identify the >>>>>>>>>>>> >>>>>>>>>>> checkpoints >>>>>>> >>>>>>>> of >>>>>>>> >>>>>>>>> stateless operators during relaunch instead of computing >>>>>>>>>>>> >>>>>>>>>>> from >>>> >>>>> latest >>>>>>>> >>>>>>>>> timestamp. >>>>>>>>>>>> >>>>>>>>>>>> - Bring the entire DAG to committedWindowId. This could be >>>>>>>>>>>> >>>>>>>>>>> achived >>>>>>> >>>>>>>> using >>>>>>>>>> >>>>>>>>>>> writing committedWindowId in a journal. we need to make >>>>>>>>>>>> >>>>>>>>>>> sure >>>> >>>>> that >>>>>> >>>>>>> we >>>>>>>> >>>>>>>>> are >>>>>>>>>> >>>>>>>>>>> not puring the checkpointed state until the >>>>>>>>>>>> >>>>>>>>>>> committedWundowId >>>> >>>>> is >>>>>> >>>>>>> saved >>>>>>>>> >>>>>>>>>> in >>>>>>>>>> >>>>>>>>>>> journal. >>>>>>>>>>>> >>>>>>>>>>>> Let me know your thoughs on this and preferred solution. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> -Tushar. >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>> *Join us at Apex Big Data World-San Jose >>>>>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* >>>>>> [image: http://www.apexbigdata.com/san-jose-register.html] >>>>>> >>>>>> >> >