I have opened an pull request #490 to handle this scenario by implementing option 3.
-Tushar. On Tue, Mar 14, 2017 at 7:46 PM, Thomas Weise <t...@apache.org> wrote: > The JDBC operator was an example of a valid use case where no state needs > to be saved in a checkpoint (it is saved to the database instead). We > cannot break such use cases and neither should the application developer be > exposed to it, especially not in backward incompatible fashion. > > I think that the platform/operators need to handle this. Perhaps we can > look at all the "stateless" operators and see which ones really depend on > at-least-once processing (to produce exactly-once results) and then change > the remaining to have default processing mode at-most-once? That would > require separate annotation support to indicate a default processing mode > for an operator. > > Thanks, > Thomas > > > > > > > > > On Tue, Mar 14, 2017 at 4:26 AM, Tushar Gosavi <tus...@datatorrent.com> > wrote: > > > Hi Vlad, > > > > I am more worried about backward compatibility. Most of the new users > will > > be not able to launch simple applications which are starting point while > > learning Apex such as PiDemo and WordCount. We could provide a way for > user > > to not use empty file storage agent for leaf stateless operators if he is > > more worried about name node operations. > > > > - Tushar. > > > > > > On Sat, Mar 11, 2017 at 6:10 AM, Vlad Rozov <v.ro...@datatorrent.com> > > wrote: > > > >> I would prefer to go with option 2 (and maybe reuse -force flag to allow > >> launching application that do not validate due to newly introduced > rule). I > >> am not sure that it is OK to outsmart application designer and force > >> stateless operator to become statefull. > >> > >> Thank you, > >> > >> Vlad > >> > >> *Join us at Apex Big Data World-San Jose > >> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017* > >> [image: http://www.apexbigdata.com/san-jose-register.html] > >> <http://www.apexbigdata.com/san-jose-register.html> > >> On 3/10/17 07:38, Thomas Weise wrote: > >> > >> +1 > >> > >> But keep in mind it will cause unnecessary name node operations and > >> therefore it would be good to only use it when it is really needed (i.e. > >> the operator in reality isn't stateless, it stores its state somewhere > >> else). > >> > >> Can we look at optimizing the behavior for "stateless" operators that > are > >> really stateless. For example the console operator should by default be > >> AT_MOST_ONCE? > >> > >> > >> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda < > bhup...@datatorrent.com> <bhup...@datatorrent.com> > >> wrote: > >> > >> > >> My preference is also for option 3. It looks clean and simple to > implement. > >> > >> ~ Bhupesh > >> > >> > >> _______________________________________________________ > >> > >> Bhupesh Chawda > >> > >> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >> www.datatorrent.com | apex.apache.org > >> > >> > >> > >> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tus...@datatorrent.com> > <tus...@datatorrent.com> > >> wrote: > >> > >> > >> Can you please let me know your preference? My preference is for > solution > >> 3, by adding a StorageAgent which creates an empty file, and using this > >> storage agent for leaf stateless operators. > >> > >> - Tushar. > >> > >> > >> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tus...@datatorrent.com> > <tus...@datatorrent.com> > >> wrote: > >> > >> > >> Thank you all for the feedback. > >> > >> Some of the useful output operator can be stateless, they push data > >> received in a window to output store. for example KafkaOutputOperator/ > >> > >> JDBCOutputOperator, > >> > >> or the output stores where > >> writes are idempotent, which covers most of the key-value stores. > >> > >> I was going to use the existing logic to compute the committedWindowId > >> with addition of few steps explained below. > >> solution-1 > >> - Calculate committedWindow with leaf operator checkpoints set to > >> > >> current > >> > >> timestamp (current behaviour) > >> - Update leaf operators recoveryWindowId to committedWindowId > >> - Calculate committedWindow again, this steps is required because as > >> downstream operator recoveryWindowId is reduced and hence we may have > >> > >> to > >> > >> adjust the recoveryWindowId of upstream operators. > >> > >> This will prevent leaf stateless opeartors to start from current > >> timestamp, hence reducing amount of data loss. But As per the concern > >> raised by Bhupesh about last stateless operator being slow, the > >> > >> solution > >> > >> suggested by Vlad is sufficient > >> > >> solution-1 > >> - as explained above. If little loss is expected we could go with this > >> appraoch. > >> solution-2 > >> - Fail validation if last operator is stateless in AT_LEAST_ONCE > >> > >> scenario > >> > >> as suggested by Vlad. > >> This could break backward compatibility as old applications will fail > >> > >> to > >> > >> launch. > >> solution-3 > >> - Mark last operator stateful in AT_LEAST_ONCE scenario. > >> > >> Let me know about your preference. > >> > >> Regards, > >> - Tushar. > >> > >> > >> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v.ro...@datatorrent.com> < > v.ro...@datatorrent.com> > >> wrote: > >> > >> > >> For a long chain of stateless operators at the end of a DAG, it is > >> possible that time to propagate the end window to a leaf operator is > >> greater than the time for a checkpoint to be persisted in HDFS. > >> > >> If at least once processing guarantee is necessary, the leaf operators > >> should not be STATELESS. Will invalidating DAG that has one or more > >> > >> leaf > >> > >> operator marked as STATELESS with AT_LEAST_ONCE processing solve > >> APEXCORE-619? It is not the best solution, but I think it is > >> > >> sufficient > >> > >> for > >> > >> the described scenario. > >> > >> Thank you, > >> > >> Vlad > >> > >> > >> On 3/2/17 08:43, Thomas Weise wrote: > >> > >> > >> Good point, that's correct for a stateless leaf operator (operator > >> > >> that > >> > >> does not have downstream operators). The minimum of upstream > >> > >> checkpoints > >> > >> can be higher than the last windowId seen by the leaf operator. > >> > >> Although > >> > >> that is a low probability, because it would mean the time it took for > >> > >> the > >> > >> checkpoint to become visible in HDFS is less than propagation of > >> endWindow > >> downstream. > >> > >> It's also not a problem for an intermediate stateless operator, > >> > >> because > >> > >> the > >> downstream checkpoint will inform the recovery windowId. Most of the > >> > >> time > >> > >> stateless operators are intermediate. > >> > >> Leaf operators are the output operators. I suspect in the original > >> scenario > >> is was a console output operator? Useful output operators usually > >> > >> won't > >> > >> be > >> stateless, they have to track state to interact with the external > >> > >> system > >> > >> correctly. I'm bringing this up for adequate cost/benefit analysis. > >> > >> In absence of stateful downstream operator, you only have the > >> > >> committed > >> > >> windowId, which is essentially a checkpointing watermark. On > >> > >> application > >> > >> restart it has to be recomputed from the checkpoints available, and > >> > >> does > >> > >> not cover the scenario Tushar reported originally. > >> > >> Saving committed windowId comes at a cost, it would have to be > >> > >> written > >> > >> to > >> > >> the journal before operators are notified. Care has been taken to no > >> write > >> unnecessarily to the journal, as it is blocking I/O and in this case > >> > >> the > >> > >> frequency depends on the order of arrival of checkpoint notifications > >> from > >> operators. We also don't want to delay commitedWindow notification, > >> > >> as > >> > >> that > >> would introduce latency. > >> > >> Thomas > >> > >> > >> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda < > >> > >> bhup...@datatorrent.com> > >> > >> wrote: > >> > >> What if all operators complete first checkpoints but the stateless > >> > >> operator > >> could not cross the first checkpoint window, and the DAG crashed. > >> If we try to figure out the recovery checkpoint now, we might > >> > >> conclude > >> > >> that > >> checkpoint 1 is the point to start and we may miss some data getting > >> processed by the stateless operator. Probably in this case at-least > >> once is > >> also not guaranteed? > >> > >> ~ Bhupesh > >> > >> > >> _______________________________________________________ > >> > >> Bhupesh Chawda > >> > >> E: bhup...@datatorrent.com | Twitter: @bhupeshsc > >> www.datatorrent.com | apex.apache.org > >> > >> > >> > >> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <t...@apache.org> < > t...@apache.org> > >> > >> wrote: > >> > >> Dummy checkpoints, continuously writing committed window id and the > >> > >> like > >> > >> all introduce overhead that is probably not needed. > >> > >> All the information to derive what we need is likely available and > >> > >> IMO > >> > >> the > >> > >> > >> discussion should be on what is the correct way of using it. I will > >> have > >> > >> > >> a > >> > >> > >> look when I get to it as well. > >> > >> Thanks, > >> Thomas > >> > >> > >> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde < > >> > >> sand...@datatorrent.com > >> > >> wrote: > >> > >> Instead of treating the stateless operator in a special way and > >> > >> missing > >> > >> corner cases, just have a dummy checkpoint, then there is no need > >> > >> to > >> > >> handle > >> > >> > >> corner cases. > >> > >> There is a name for this solution,https://en.wikipedia. > org/wiki/Null_Object_pattern > >> > >> > >> > >> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pra...@datatorrent.com > >> wrote: > >> > >> There is code in various places that deals with stateless > >> > >> operators > >> > >> in > >> > >> a > >> > >> > >> special way even though a physical checkpoint does not exist on > >> > >> the > >> > >> disk. > >> > >> It is probably a matter of applying similar thought process/logic > >> > >> correctly > >> > >> > >> here. > >> > >> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <a...@datatorrent.com > >> > >> wrote: > >> > >> hmm! the fact that commitWindowId has moved up (right now in > >> > >> memory > >> > >> of > >> > >> Stram) should mean that a complete set of checkpoints are > >> > >> available, > >> > >> i.e > >> > >> commitWindowId can be derived. Lets say that next checkpoint > >> > >> window > >> > >> also > >> > >> gets checkpointed across the app, commitwindowID is in memory but > >> > >> not > >> > >> written to stram-state yet, then upon relaunch the latest > >> > >> commitwindowID > >> > >> should get computed correctly. > >> > >> This may be just about setting stateless operators to > >> > >> > >> commitWindowid > >> > >> on > >> > >> > >> re-launch? aka bug/feature? > >> > >> Thks > >> Amol > >> > >> > >> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> > <(510)%20449-2606> | > >> > >> > >> Twitter: > >> > >> @*amolhkekre* > >> > >> www.datatorrent.com | apex.apache.org > >> > >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata. > com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, > 2017!* > >> [image: http://www.apexbigdata.com/san-jose-register.html]<http:/ > /www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/ > san-jose-register.html> > >> > >> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni < > >> > >> > >> pra...@datatorrent.com> > >> > >> wrote: > >> > >> Do we need to save committedWindowId? Can't it be computed from > >> > >> existing > >> > >> checkpoints by walking through the DAG. We probably do this > >> > >> anyway > >> > >> and > >> > >> I > >> > >> > >> suspect there is a minor bug somewhere in there. If an operator > >> > >> is > >> > >> stateless you could assume checkpoint as long max for sake of > >> > >> computation > >> > >> and compute the committed window to be the lowest common > >> > >> checkpoint. > >> > >> If > >> > >> > >> they are all stateless and you end up with long max you can start > >> > >> with > >> > >> window id that reflects the current timestamp. > >> > >> Thanks > >> > >> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre < > >> > >> a...@datatorrent.com > >> > >> wrote: > >> > >> CommitWindowId could be computed from the existing checkpoints. > >> > >> That > >> > >> solution still needs purge to be done after commitWindowId is > >> > >> confirmed > >> > >> to > >> > >> be saved in Stram state. Without ths the commitWindowId > >> > >> > >> computed > >> > >> from > >> > >> the > >> > >> checkpoints may have some checkpoints missing. > >> > >> Thks > >> Amol > >> > >> > >> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> > <(510)%20449-2606> | > >> > >> > >> Twitter: @*amolhkekre* > >> > >> www.datatorrent.com | apex.apache.org > >> > >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata. > com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, > 2017!* > >> [image: http://www.apexbigdata.com/san-jose-register.html]<http:/ > /www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/ > san-jose-register.html> > >> > >> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni < > >> > >> > >> pra...@datatorrent.com > >> > >> wrote: > >> > >> Can't the commitedWindowId be calculated by looking at the > >> > >> physical > >> > >> plan > >> > >> and the existing checkpoints? > >> > >> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi < > >> > >> > >> tus...@apache.org > >> > >> wrote: > >> > >> Help Needed for APEXCORE-619 > >> > >> Issue : When application is relaunched after long time with > >> > >> > >> stateless > >> > >> opeartors at the end of the DAG, the stateless operators > >> > >> starts > >> > >> with > >> > >> a > >> > >> > >> very > >> > >> high windowId. In this case the stateless operator ignors > >> > >> > >> all > >> > >> the > >> > >> data > >> > >> received till upstream operator catches up with it. This > >> > >> breaks > >> > >> the > >> > >> *at-least-once* gaurantee while relaunch of the opeartor or > >> > >> when > >> > >> master > >> > >> is > >> > >> killed and application is restarted. > >> > >> Solutions: > >> - Fix windowId for stateless leaf operators from upstream > >> > >> > >> opeartor. > >> > >> But > >> > >> it > >> > >> has some issues when we have a join with two upstrams > >> > >> > >> operators > >> > >> at > >> > >> different windowId. If we set the windowID to min(upstream > >> > >> windowId), > >> > >> then > >> > >> we need to again recalulate the new recovery window ids for > >> > >> > >> upstream > >> > >> paths > >> > >> from this operators. > >> > >> - Other solution is to create a empty file in checkpoint > >> > >> > >> directory > >> > >> for > >> > >> stateless operators. This will help us to identify the > >> > >> checkpoints > >> > >> of > >> > >> > >> stateless operators during relaunch instead of computing > >> > >> from > >> > >> latest > >> > >> timestamp. > >> > >> - Bring the entire DAG to committedWindowId. This could be > >> > >> > >> achived > >> > >> using > >> > >> writing committedWindowId in a journal. we need to make > >> > >> sure > >> > >> that > >> > >> we > >> > >> are > >> > >> not puring the checkpointed state until the > >> > >> committedWundowId > >> > >> is > >> > >> saved > >> > >> in > >> > >> > >> journal. > >> > >> Let me know your thoughs on this and preferred solution. > >> > >> Regards, > >> -Tushar. > >> > >> -- > >> > >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata. > com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4, > 2017!* > >> [image: http://www.apexbigdata.com/san-jose-register.html] > >> > >> > >> > >> > >> > > >