I have opened an pull request #490 to handle this scenario by implementing
option 3.

-Tushar.


On Tue, Mar 14, 2017 at 7:46 PM, Thomas Weise <t...@apache.org> wrote:

> The JDBC operator was an example of a valid use case where no state needs
> to be saved in a checkpoint (it is saved to the database instead). We
> cannot break such use cases and neither should the application developer be
> exposed to it, especially not in backward incompatible fashion.
>
> I think that the platform/operators need to handle this. Perhaps we can
> look at all the "stateless" operators and see which ones really depend on
> at-least-once processing (to produce exactly-once results) and then change
> the remaining to have default processing mode at-most-once? That would
> require separate annotation support to indicate a default processing mode
> for an operator.
>
> Thanks,
> Thomas
>
>
>
>
>
>
>
>
> On Tue, Mar 14, 2017 at 4:26 AM, Tushar Gosavi <tus...@datatorrent.com>
> wrote:
>
> > Hi Vlad,
> >
> > I am more worried about backward compatibility. Most of the new users
> will
> > be not able to launch simple applications which are starting point while
> > learning Apex such as PiDemo and WordCount. We could provide a way for
> user
> > to not use empty file storage agent for leaf stateless operators if he is
> > more worried about name node operations.
> >
> > - Tushar.
> >
> >
> > On Sat, Mar 11, 2017 at 6:10 AM, Vlad Rozov <v.ro...@datatorrent.com>
> > wrote:
> >
> >> I would prefer to go with option 2 (and maybe reuse -force flag to allow
> >> launching application that do not validate due to newly introduced
> rule). I
> >> am not sure that it is OK to outsmart application designer and force
> >> stateless operator to become statefull.
> >>
> >> Thank you,
> >>
> >> Vlad
> >>
> >> *Join us at Apex Big Data World-San Jose
> >> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017*
> >> [image: http://www.apexbigdata.com/san-jose-register.html]
> >> <http://www.apexbigdata.com/san-jose-register.html>
> >> On 3/10/17 07:38, Thomas Weise wrote:
> >>
> >> +1
> >>
> >> But keep in mind it will cause unnecessary name node operations and
> >> therefore it would be good to only use it when it is really needed (i.e.
> >> the operator in reality isn't stateless, it stores its state somewhere
> >> else).
> >>
> >> Can we look at optimizing the behavior for "stateless" operators that
> are
> >> really stateless. For example the console operator should by default be
> >> AT_MOST_ONCE?
> >>
> >>
> >> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <
> bhup...@datatorrent.com> <bhup...@datatorrent.com>
> >> wrote:
> >>
> >>
> >> My preference is also for option 3. It looks clean and simple to
> implement.
> >>
> >> ~ Bhupesh
> >>
> >>
> >> _______________________________________________________
> >>
> >> Bhupesh Chawda
> >>
> >> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >> www.datatorrent.com  |  apex.apache.org
> >>
> >>
> >>
> >> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tus...@datatorrent.com>
> <tus...@datatorrent.com>
> >> wrote:
> >>
> >>
> >> Can you please let me know your preference? My preference is for
> solution
> >> 3, by adding a StorageAgent which creates an empty file, and using this
> >> storage agent for leaf stateless operators.
> >>
> >> - Tushar.
> >>
> >>
> >> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tus...@datatorrent.com>
> <tus...@datatorrent.com>
> >> wrote:
> >>
> >>
> >> Thank you all for the feedback.
> >>
> >> Some of the useful output operator can be stateless, they push data
> >> received in a window to output store. for example KafkaOutputOperator/
> >>
> >> JDBCOutputOperator,
> >>
> >> or the output stores where
> >> writes are idempotent, which covers most of the key-value stores.
> >>
> >> I was going to use the existing logic to compute the committedWindowId
> >> with addition of few steps explained below.
> >> solution-1
> >> - Calculate committedWindow with leaf operator checkpoints set to
> >>
> >> current
> >>
> >> timestamp (current behaviour)
> >> - Update leaf operators recoveryWindowId to committedWindowId
> >> - Calculate committedWindow again, this steps is required because as
> >> downstream operator recoveryWindowId is reduced and hence we may have
> >>
> >> to
> >>
> >> adjust the recoveryWindowId of upstream operators.
> >>
> >> This will prevent leaf stateless opeartors to start from current
> >> timestamp, hence reducing amount of data loss. But As per the concern
> >> raised by Bhupesh about last stateless operator being slow, the
> >>
> >> solution
> >>
> >> suggested by Vlad is sufficient
> >>
> >> solution-1
> >> - as explained above. If little loss is expected we could go with this
> >> appraoch.
> >> solution-2
> >> - Fail validation if last operator is stateless in AT_LEAST_ONCE
> >>
> >> scenario
> >>
> >> as suggested by Vlad.
> >>   This could break backward compatibility as old applications will fail
> >>
> >> to
> >>
> >> launch.
> >> solution-3
> >> - Mark last operator stateful in AT_LEAST_ONCE scenario.
> >>
> >> Let me know about your preference.
> >>
> >> Regards,
> >> - Tushar.
> >>
> >>
> >> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v.ro...@datatorrent.com> <
> v.ro...@datatorrent.com>
> >> wrote:
> >>
> >>
> >> For a long chain of stateless operators at the end of a DAG, it is
> >> possible that time to propagate the end window to a leaf operator is
> >> greater than the time for a checkpoint to be persisted in HDFS.
> >>
> >> If at least once processing guarantee is necessary, the leaf operators
> >> should not be STATELESS. Will invalidating DAG that has one or more
> >>
> >> leaf
> >>
> >> operator marked as STATELESS with AT_LEAST_ONCE processing solve
> >> APEXCORE-619? It is not the best solution, but I think it is
> >>
> >> sufficient
> >>
> >> for
> >>
> >> the described scenario.
> >>
> >> Thank you,
> >>
> >> Vlad
> >>
> >>
> >> On 3/2/17 08:43, Thomas Weise wrote:
> >>
> >>
> >> Good point, that's correct for a stateless leaf operator (operator
> >>
> >> that
> >>
> >> does not have downstream operators). The minimum of upstream
> >>
> >> checkpoints
> >>
> >> can be higher than the last windowId seen by the leaf operator.
> >>
> >> Although
> >>
> >> that is a low probability, because it would mean the time it took for
> >>
> >> the
> >>
> >> checkpoint to become visible in HDFS is less than propagation of
> >> endWindow
> >> downstream.
> >>
> >> It's also not a problem for an intermediate stateless operator,
> >>
> >> because
> >>
> >> the
> >> downstream checkpoint will inform the recovery windowId. Most of the
> >>
> >> time
> >>
> >> stateless operators are intermediate.
> >>
> >> Leaf operators are the output operators. I suspect in the original
> >> scenario
> >> is was a console output operator? Useful output operators usually
> >>
> >> won't
> >>
> >> be
> >> stateless, they have to track state to interact with the external
> >>
> >> system
> >>
> >> correctly. I'm bringing this up for adequate cost/benefit analysis.
> >>
> >> In absence of stateful downstream operator, you only have the
> >>
> >> committed
> >>
> >> windowId, which is essentially a checkpointing watermark. On
> >>
> >> application
> >>
> >> restart it has to be recomputed from the checkpoints available, and
> >>
> >> does
> >>
> >> not cover the scenario Tushar reported originally.
> >>
> >> Saving committed windowId comes at a cost, it would have to be
> >>
> >> written
> >>
> >> to
> >>
> >> the journal before operators are notified. Care has been taken to no
> >> write
> >> unnecessarily to the journal, as it is blocking I/O and in this case
> >>
> >> the
> >>
> >> frequency depends on the order of arrival of checkpoint notifications
> >> from
> >> operators. We also don't want to delay commitedWindow notification,
> >>
> >> as
> >>
> >> that
> >> would introduce latency.
> >>
> >> Thomas
> >>
> >>
> >> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
> >>
> >> bhup...@datatorrent.com>
> >>
> >> wrote:
> >>
> >> What if all operators complete first checkpoints but the stateless
> >>
> >> operator
> >> could not cross the first checkpoint window, and the DAG crashed.
> >> If we try to figure out the recovery checkpoint now, we might
> >>
> >> conclude
> >>
> >> that
> >> checkpoint 1 is the point to start and we may miss some data getting
> >> processed by the stateless operator. Probably in this case at-least
> >> once is
> >> also not guaranteed?
> >>
> >> ~ Bhupesh
> >>
> >>
> >> _______________________________________________________
> >>
> >> Bhupesh Chawda
> >>
> >> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> >> www.datatorrent.com  |  apex.apache.org
> >>
> >>
> >>
> >> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <t...@apache.org> <
> t...@apache.org>
> >>
> >> wrote:
> >>
> >> Dummy checkpoints, continuously writing committed window id and the
> >>
> >> like
> >>
> >> all introduce overhead that is probably not needed.
> >>
> >> All the information to derive what we need is likely available and
> >>
> >> IMO
> >>
> >> the
> >>
> >>
> >> discussion should be on what is the correct way of using it. I will
> >> have
> >>
> >>
> >> a
> >>
> >>
> >> look when I get to it as well.
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
> >>
> >> sand...@datatorrent.com
> >>
> >> wrote:
> >>
> >> Instead of treating the stateless operator in a special way and
> >>
> >> missing
> >>
> >> corner cases, just have a dummy checkpoint, then there is no need
> >>
> >> to
> >>
> >> handle
> >>
> >>
> >> corner cases.
> >>
> >> There is a name for this solution,https://en.wikipedia.
> org/wiki/Null_Object_pattern
> >>
> >>
> >>
> >> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pra...@datatorrent.com
> >> wrote:
> >>
> >> There is code in various places that deals with stateless
> >>
> >> operators
> >>
> >> in
> >>
> >> a
> >>
> >>
> >> special way even though a physical checkpoint does not exist on
> >>
> >> the
> >>
> >> disk.
> >>
> >> It is probably a matter of applying similar thought process/logic
> >>
> >> correctly
> >>
> >>
> >> here.
> >>
> >> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <a...@datatorrent.com
> >>
> >> wrote:
> >>
> >> hmm! the fact that commitWindowId has moved up (right now in
> >>
> >> memory
> >>
> >> of
> >>
> >> Stram) should mean that a complete set of checkpoints are
> >>
> >> available,
> >>
> >> i.e
> >>
> >> commitWindowId can be derived. Lets say that next checkpoint
> >>
> >> window
> >>
> >> also
> >>
> >> gets checkpointed across the app, commitwindowID is in memory but
> >>
> >> not
> >>
> >> written to stram-state yet, then upon relaunch the latest
> >>
> >> commitwindowID
> >>
> >> should get computed correctly.
> >>
> >> This may be just about setting stateless operators to
> >>
> >>
> >> commitWindowid
> >>
> >> on
> >>
> >>
> >> re-launch? aka bug/feature?
> >>
> >> Thks
> >> Amol
> >>
> >>
> >> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606>
> <(510)%20449-2606> |
> >>
> >>
> >> Twitter:
> >>
> >> @*amolhkekre*
> >>
> >> www.datatorrent.com  |  apex.apache.org
> >>
> >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
> com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
> 2017!*
> >> [image: http://www.apexbigdata.com/san-jose-register.html]<http:/
> /www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/
> san-jose-register.html>
> >>
> >> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
> >>
> >>
> >> pra...@datatorrent.com>
> >>
> >> wrote:
> >>
> >> Do we need to save committedWindowId? Can't it be computed from
> >>
> >> existing
> >>
> >> checkpoints by walking through the DAG. We probably do this
> >>
> >> anyway
> >>
> >> and
> >>
> >> I
> >>
> >>
> >> suspect there is a minor bug somewhere in there. If an operator
> >>
> >> is
> >>
> >> stateless you could assume checkpoint as long max for sake of
> >>
> >> computation
> >>
> >> and compute the committed window to be the lowest common
> >>
> >> checkpoint.
> >>
> >> If
> >>
> >>
> >> they are all stateless and you end up with long max you can start
> >>
> >> with
> >>
> >> window id that reflects the current timestamp.
> >>
> >> Thanks
> >>
> >> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
> >>
> >> a...@datatorrent.com
> >>
> >> wrote:
> >>
> >> CommitWindowId could be computed from the existing checkpoints.
> >>
> >> That
> >>
> >> solution still needs purge to be done after commitWindowId is
> >>
> >> confirmed
> >>
> >> to
> >>
> >> be saved in Stram state. Without ths the commitWindowId
> >>
> >>
> >> computed
> >>
> >> from
> >>
> >> the
> >>
> >> checkpoints may have some checkpoints missing.
> >>
> >> Thks
> >> Amol
> >>
> >>
> >> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606>
> <(510)%20449-2606> |
> >>
> >>
> >> Twitter: @*amolhkekre*
> >>
> >> www.datatorrent.com  |  apex.apache.org
> >>
> >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
> com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
> 2017!*
> >> [image: http://www.apexbigdata.com/san-jose-register.html]<http:/
> /www.apexbigdata.com/san-jose-register.html> <http://www.apexbigdata.com/
> san-jose-register.html>
> >>
> >> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
> >>
> >>
> >> pra...@datatorrent.com
> >>
> >> wrote:
> >>
> >> Can't the commitedWindowId be calculated by looking at the
> >>
> >> physical
> >>
> >> plan
> >>
> >> and the existing checkpoints?
> >>
> >> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
> >>
> >>
> >> tus...@apache.org
> >>
> >> wrote:
> >>
> >> Help Needed for APEXCORE-619
> >>
> >> Issue : When application is relaunched after long time with
> >>
> >>
> >> stateless
> >>
> >> opeartors at the end of the DAG, the stateless operators
> >>
> >> starts
> >>
> >> with
> >>
> >> a
> >>
> >>
> >> very
> >>
> >> high windowId. In this case the stateless operator ignors
> >>
> >>
> >> all
> >>
> >> the
> >>
> >> data
> >>
> >> received till upstream operator catches up with it. This
> >>
> >> breaks
> >>
> >> the
> >>
> >> *at-least-once* gaurantee while relaunch of the opeartor or
> >>
> >> when
> >>
> >> master
> >>
> >> is
> >>
> >> killed and application is restarted.
> >>
> >> Solutions:
> >> - Fix windowId for stateless leaf operators from upstream
> >>
> >>
> >> opeartor.
> >>
> >> But
> >>
> >> it
> >>
> >> has some issues when we have a join with two upstrams
> >>
> >>
> >> operators
> >>
> >> at
> >>
> >> different windowId. If we set the windowID to min(upstream
> >>
> >> windowId),
> >>
> >> then
> >>
> >> we need to again recalulate the new recovery window ids for
> >>
> >>
> >> upstream
> >>
> >> paths
> >>
> >> from this operators.
> >>
> >> - Other solution is to create a empty file in checkpoint
> >>
> >>
> >> directory
> >>
> >> for
> >>
> >> stateless operators. This will help us to identify the
> >>
> >> checkpoints
> >>
> >> of
> >>
> >>
> >> stateless operators during relaunch instead of computing
> >>
> >> from
> >>
> >> latest
> >>
> >> timestamp.
> >>
> >> - Bring the entire DAG to committedWindowId. This could be
> >>
> >>
> >> achived
> >>
> >> using
> >>
> >> writing committedWindowId in a journal. we need to make
> >>
> >> sure
> >>
> >> that
> >>
> >> we
> >>
> >> are
> >>
> >> not puring the checkpointed state until the
> >>
> >> committedWundowId
> >>
> >> is
> >>
> >> saved
> >>
> >> in
> >>
> >>
> >> journal.
> >>
> >> Let me know your thoughs on this and preferred solution.
> >>
> >> Regards,
> >> -Tushar.
> >>
> >> --
> >>
> >> *Join us at Apex Big Data World-San Jose<http://www.apexbigdata.
> com/san-jose.html> <http://www.apexbigdata.com/san-jose.html>, April 4,
> 2017!*
> >> [image: http://www.apexbigdata.com/san-jose-register.html]
> >>
> >>
> >>
> >>
> >>
> >
>

Reply via email to