The JDBC operator was an example of a valid use case where no state needs to be saved in a checkpoint (it is saved to the database instead). We cannot break such use cases and neither should the application developer be exposed to it, especially not in backward incompatible fashion.
I think that the platform/operators need to handle this. Perhaps we can look at all the "stateless" operators and see which ones really depend on at-least-once processing (to produce exactly-once results) and then change the remaining to have default processing mode at-most-once? That would require separate annotation support to indicate a default processing mode for an operator. Thanks, Thomas On Tue, Mar 14, 2017 at 4:26 AM, Tushar Gosavi <tus...@datatorrent.com> wrote: > Hi Vlad, > > I am more worried about backward compatibility. Most of the new users will > be not able to launch simple applications which are starting point while > learning Apex such as PiDemo and WordCount. We could provide a way for user > to not use empty file storage agent for leaf stateless operators if he is > more worried about name node operations. > > - Tushar. > > > On Sat, Mar 11, 2017 at 6:10 AM, Vlad Rozov <v.ro...@datatorrent.com> > wrote: > >> I would prefer to go with option 2 (and maybe reuse -force flag to allow >> launching application that do not validate due to newly introduced rule). I >> am not sure that it is OK to outsmart application designer and force >> stateless operator to become statefull. >> >> Thank you, >> >> Vlad >> >> *Join us at Apex Big Data World-San Jose >> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017* >> [image: http://www.apexbigdata.com/san-jose-register.html] >> <http://www.apexbigdata.com/san-jose-register.html> >> On 3/10/17 07:38, Thomas Weise wrote: >> >> +1 >> >> But keep in mind it will cause unnecessary name node operations and >> therefore it would be good to only use it when it is really needed (i.e. >> the operator in reality isn't stateless, it stores its state somewhere >> else). >> >> Can we look at optimizing the behavior for "stateless" operators that are >> really stateless. For example the console operator should by default be >> AT_MOST_ONCE? >> >> >> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <bhup...@datatorrent.com> >> <bhup...@datatorrent.com> >> wrote: >> >> >> My preference is also for option 3. It looks clean and simple to implement. >> >> ~ Bhupesh >> >> >> _______________________________________________________ >> >> Bhupesh Chawda >> >> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >> www.datatorrent.com | apex.apache.org >> >> >> >> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tus...@datatorrent.com> >> <tus...@datatorrent.com> >> wrote: >> >> >> Can you please let me know your preference? My preference is for solution >> 3, by adding a StorageAgent which creates an empty file, and using this >> storage agent for leaf stateless operators. >> >> - Tushar. >> >> >> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tus...@datatorrent.com> >> <tus...@datatorrent.com> >> wrote: >> >> >> Thank you all for the feedback. >> >> Some of the useful output operator can be stateless, they push data >> received in a window to output store. for example KafkaOutputOperator/ >> >> JDBCOutputOperator, >> >> or the output stores where >> writes are idempotent, which covers most of the key-value stores. >> >> I was going to use the existing logic to compute the committedWindowId >> with addition of few steps explained below. >> solution-1 >> - Calculate committedWindow with leaf operator checkpoints set to >> >> current >> >> timestamp (current behaviour) >> - Update leaf operators recoveryWindowId to committedWindowId >> - Calculate committedWindow again, this steps is required because as >> downstream operator recoveryWindowId is reduced and hence we may have >> >> to >> >> adjust the recoveryWindowId of upstream operators. >> >> This will prevent leaf stateless opeartors to start from current >> timestamp, hence reducing amount of data loss. But As per the concern >> raised by Bhupesh about last stateless operator being slow, the >> >> solution >> >> suggested by Vlad is sufficient >> >> solution-1 >> - as explained above. If little loss is expected we could go with this >> appraoch. >> solution-2 >> - Fail validation if last operator is stateless in AT_LEAST_ONCE >> >> scenario >> >> as suggested by Vlad. >> This could break backward compatibility as old applications will fail >> >> to >> >> launch. >> solution-3 >> - Mark last operator stateful in AT_LEAST_ONCE scenario. >> >> Let me know about your preference. >> >> Regards, >> - Tushar. >> >> >> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v.ro...@datatorrent.com> >> <v.ro...@datatorrent.com> >> wrote: >> >> >> For a long chain of stateless operators at the end of a DAG, it is >> possible that time to propagate the end window to a leaf operator is >> greater than the time for a checkpoint to be persisted in HDFS. >> >> If at least once processing guarantee is necessary, the leaf operators >> should not be STATELESS. Will invalidating DAG that has one or more >> >> leaf >> >> operator marked as STATELESS with AT_LEAST_ONCE processing solve >> APEXCORE-619? It is not the best solution, but I think it is >> >> sufficient >> >> for >> >> the described scenario. >> >> Thank you, >> >> Vlad >> >> >> On 3/2/17 08:43, Thomas Weise wrote: >> >> >> Good point, that's correct for a stateless leaf operator (operator >> >> that >> >> does not have downstream operators). The minimum of upstream >> >> checkpoints >> >> can be higher than the last windowId seen by the leaf operator. >> >> Although >> >> that is a low probability, because it would mean the time it took for >> >> the >> >> checkpoint to become visible in HDFS is less than propagation of >> endWindow >> downstream. >> >> It's also not a problem for an intermediate stateless operator, >> >> because >> >> the >> downstream checkpoint will inform the recovery windowId. Most of the >> >> time >> >> stateless operators are intermediate. >> >> Leaf operators are the output operators. I suspect in the original >> scenario >> is was a console output operator? Useful output operators usually >> >> won't >> >> be >> stateless, they have to track state to interact with the external >> >> system >> >> correctly. I'm bringing this up for adequate cost/benefit analysis. >> >> In absence of stateful downstream operator, you only have the >> >> committed >> >> windowId, which is essentially a checkpointing watermark. On >> >> application >> >> restart it has to be recomputed from the checkpoints available, and >> >> does >> >> not cover the scenario Tushar reported originally. >> >> Saving committed windowId comes at a cost, it would have to be >> >> written >> >> to >> >> the journal before operators are notified. Care has been taken to no >> write >> unnecessarily to the journal, as it is blocking I/O and in this case >> >> the >> >> frequency depends on the order of arrival of checkpoint notifications >> from >> operators. We also don't want to delay commitedWindow notification, >> >> as >> >> that >> would introduce latency. >> >> Thomas >> >> >> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda < >> >> bhup...@datatorrent.com> >> >> wrote: >> >> What if all operators complete first checkpoints but the stateless >> >> operator >> could not cross the first checkpoint window, and the DAG crashed. >> If we try to figure out the recovery checkpoint now, we might >> >> conclude >> >> that >> checkpoint 1 is the point to start and we may miss some data getting >> processed by the stateless operator. Probably in this case at-least >> once is >> also not guaranteed? >> >> ~ Bhupesh >> >> >> _______________________________________________________ >> >> Bhupesh Chawda >> >> E: bhup...@datatorrent.com | Twitter: @bhupeshsc >> www.datatorrent.com | apex.apache.org >> >> >> >> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <t...@apache.org> >> <t...@apache.org> >> >> wrote: >> >> Dummy checkpoints, continuously writing committed window id and the >> >> like >> >> all introduce overhead that is probably not needed. >> >> All the information to derive what we need is likely available and >> >> IMO >> >> the >> >> >> discussion should be on what is the correct way of using it. I will >> have >> >> >> a >> >> >> look when I get to it as well. >> >> Thanks, >> Thomas >> >> >> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde < >> >> sand...@datatorrent.com >> >> wrote: >> >> Instead of treating the stateless operator in a special way and >> >> missing >> >> corner cases, just have a dummy checkpoint, then there is no need >> >> to >> >> handle >> >> >> corner cases. >> >> There is a name for this >> solution,https://en.wikipedia.org/wiki/Null_Object_pattern >> >> >> >> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pra...@datatorrent.com >> wrote: >> >> There is code in various places that deals with stateless >> >> operators >> >> in >> >> a >> >> >> special way even though a physical checkpoint does not exist on >> >> the >> >> disk. >> >> It is probably a matter of applying similar thought process/logic >> >> correctly >> >> >> here. >> >> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <a...@datatorrent.com >> >> wrote: >> >> hmm! the fact that commitWindowId has moved up (right now in >> >> memory >> >> of >> >> Stram) should mean that a complete set of checkpoints are >> >> available, >> >> i.e >> >> commitWindowId can be derived. Lets say that next checkpoint >> >> window >> >> also >> >> gets checkpointed across the app, commitwindowID is in memory but >> >> not >> >> written to stram-state yet, then upon relaunch the latest >> >> commitwindowID >> >> should get computed correctly. >> >> This may be just about setting stateless operators to >> >> >> commitWindowid >> >> on >> >> >> re-launch? aka bug/feature? >> >> Thks >> Amol >> >> >> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> >> <(510)%20449-2606> | >> >> >> Twitter: >> >> @*amolhkekre* >> >> www.datatorrent.com | apex.apache.org >> >> *Join us at Apex Big Data World-San >> Jose<http://www.apexbigdata.com/san-jose.html> >> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* >> [image: >> http://www.apexbigdata.com/san-jose-register.html]<http://www.apexbigdata.com/san-jose-register.html> >> <http://www.apexbigdata.com/san-jose-register.html> >> >> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni < >> >> >> pra...@datatorrent.com> >> >> wrote: >> >> Do we need to save committedWindowId? Can't it be computed from >> >> existing >> >> checkpoints by walking through the DAG. We probably do this >> >> anyway >> >> and >> >> I >> >> >> suspect there is a minor bug somewhere in there. If an operator >> >> is >> >> stateless you could assume checkpoint as long max for sake of >> >> computation >> >> and compute the committed window to be the lowest common >> >> checkpoint. >> >> If >> >> >> they are all stateless and you end up with long max you can start >> >> with >> >> window id that reflects the current timestamp. >> >> Thanks >> >> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre < >> >> a...@datatorrent.com >> >> wrote: >> >> CommitWindowId could be computed from the existing checkpoints. >> >> That >> >> solution still needs purge to be done after commitWindowId is >> >> confirmed >> >> to >> >> be saved in Stram state. Without ths the commitWindowId >> >> >> computed >> >> from >> >> the >> >> checkpoints may have some checkpoints missing. >> >> Thks >> Amol >> >> >> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> >> <(510)%20449-2606> | >> >> >> Twitter: @*amolhkekre* >> >> www.datatorrent.com | apex.apache.org >> >> *Join us at Apex Big Data World-San >> Jose<http://www.apexbigdata.com/san-jose.html> >> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* >> [image: >> http://www.apexbigdata.com/san-jose-register.html]<http://www.apexbigdata.com/san-jose-register.html> >> <http://www.apexbigdata.com/san-jose-register.html> >> >> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni < >> >> >> pra...@datatorrent.com >> >> wrote: >> >> Can't the commitedWindowId be calculated by looking at the >> >> physical >> >> plan >> >> and the existing checkpoints? >> >> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi < >> >> >> tus...@apache.org >> >> wrote: >> >> Help Needed for APEXCORE-619 >> >> Issue : When application is relaunched after long time with >> >> >> stateless >> >> opeartors at the end of the DAG, the stateless operators >> >> starts >> >> with >> >> a >> >> >> very >> >> high windowId. In this case the stateless operator ignors >> >> >> all >> >> the >> >> data >> >> received till upstream operator catches up with it. This >> >> breaks >> >> the >> >> *at-least-once* gaurantee while relaunch of the opeartor or >> >> when >> >> master >> >> is >> >> killed and application is restarted. >> >> Solutions: >> - Fix windowId for stateless leaf operators from upstream >> >> >> opeartor. >> >> But >> >> it >> >> has some issues when we have a join with two upstrams >> >> >> operators >> >> at >> >> different windowId. If we set the windowID to min(upstream >> >> windowId), >> >> then >> >> we need to again recalulate the new recovery window ids for >> >> >> upstream >> >> paths >> >> from this operators. >> >> - Other solution is to create a empty file in checkpoint >> >> >> directory >> >> for >> >> stateless operators. This will help us to identify the >> >> checkpoints >> >> of >> >> >> stateless operators during relaunch instead of computing >> >> from >> >> latest >> >> timestamp. >> >> - Bring the entire DAG to committedWindowId. This could be >> >> >> achived >> >> using >> >> writing committedWindowId in a journal. we need to make >> >> sure >> >> that >> >> we >> >> are >> >> not puring the checkpointed state until the >> >> committedWundowId >> >> is >> >> saved >> >> in >> >> >> journal. >> >> Let me know your thoughs on this and preferred solution. >> >> Regards, >> -Tushar. >> >> -- >> >> *Join us at Apex Big Data World-San >> Jose<http://www.apexbigdata.com/san-jose.html> >> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!* >> [image: http://www.apexbigdata.com/san-jose-register.html] >> >> >> >> >> >