The JDBC operator was an example of a valid use case where no state needs
to be saved in a checkpoint (it is saved to the database instead). We
cannot break such use cases and neither should the application developer be
exposed to it, especially not in backward incompatible fashion.

I think that the platform/operators need to handle this. Perhaps we can
look at all the "stateless" operators and see which ones really depend on
at-least-once processing (to produce exactly-once results) and then change
the remaining to have default processing mode at-most-once? That would
require separate annotation support to indicate a default processing mode
for an operator.

Thanks,
Thomas








On Tue, Mar 14, 2017 at 4:26 AM, Tushar Gosavi <tus...@datatorrent.com>
wrote:

> Hi Vlad,
>
> I am more worried about backward compatibility. Most of the new users will
> be not able to launch simple applications which are starting point while
> learning Apex such as PiDemo and WordCount. We could provide a way for user
> to not use empty file storage agent for leaf stateless operators if he is
> more worried about name node operations.
>
> - Tushar.
>
>
> On Sat, Mar 11, 2017 at 6:10 AM, Vlad Rozov <v.ro...@datatorrent.com>
> wrote:
>
>> I would prefer to go with option 2 (and maybe reuse -force flag to allow
>> launching application that do not validate due to newly introduced rule). I
>> am not sure that it is OK to outsmart application designer and force
>> stateless operator to become statefull.
>>
>> Thank you,
>>
>> Vlad
>>
>> *Join us at Apex Big Data World-San Jose
>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017*
>> [image: http://www.apexbigdata.com/san-jose-register.html]
>> <http://www.apexbigdata.com/san-jose-register.html>
>> On 3/10/17 07:38, Thomas Weise wrote:
>>
>> +1
>>
>> But keep in mind it will cause unnecessary name node operations and
>> therefore it would be good to only use it when it is really needed (i.e.
>> the operator in reality isn't stateless, it stores its state somewhere
>> else).
>>
>> Can we look at optimizing the behavior for "stateless" operators that are
>> really stateless. For example the console operator should by default be
>> AT_MOST_ONCE?
>>
>>
>> On Fri, Mar 10, 2017 at 1:45 AM, Bhupesh Chawda <bhup...@datatorrent.com> 
>> <bhup...@datatorrent.com>
>> wrote:
>>
>>
>> My preference is also for option 3. It looks clean and simple to implement.
>>
>> ~ Bhupesh
>>
>>
>> _______________________________________________________
>>
>> Bhupesh Chawda
>>
>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>> www.datatorrent.com  |  apex.apache.org
>>
>>
>>
>> On Fri, Mar 10, 2017 at 3:06 PM, Tushar Gosavi <tus...@datatorrent.com> 
>> <tus...@datatorrent.com>
>> wrote:
>>
>>
>> Can you please let me know your preference? My preference is for solution
>> 3, by adding a StorageAgent which creates an empty file, and using this
>> storage agent for leaf stateless operators.
>>
>> - Tushar.
>>
>>
>> On Tue, Mar 7, 2017 at 1:52 PM, Tushar Gosavi <tus...@datatorrent.com> 
>> <tus...@datatorrent.com>
>> wrote:
>>
>>
>> Thank you all for the feedback.
>>
>> Some of the useful output operator can be stateless, they push data
>> received in a window to output store. for example KafkaOutputOperator/
>>
>> JDBCOutputOperator,
>>
>> or the output stores where
>> writes are idempotent, which covers most of the key-value stores.
>>
>> I was going to use the existing logic to compute the committedWindowId
>> with addition of few steps explained below.
>> solution-1
>> - Calculate committedWindow with leaf operator checkpoints set to
>>
>> current
>>
>> timestamp (current behaviour)
>> - Update leaf operators recoveryWindowId to committedWindowId
>> - Calculate committedWindow again, this steps is required because as
>> downstream operator recoveryWindowId is reduced and hence we may have
>>
>> to
>>
>> adjust the recoveryWindowId of upstream operators.
>>
>> This will prevent leaf stateless opeartors to start from current
>> timestamp, hence reducing amount of data loss. But As per the concern
>> raised by Bhupesh about last stateless operator being slow, the
>>
>> solution
>>
>> suggested by Vlad is sufficient
>>
>> solution-1
>> - as explained above. If little loss is expected we could go with this
>> appraoch.
>> solution-2
>> - Fail validation if last operator is stateless in AT_LEAST_ONCE
>>
>> scenario
>>
>> as suggested by Vlad.
>>   This could break backward compatibility as old applications will fail
>>
>> to
>>
>> launch.
>> solution-3
>> - Mark last operator stateful in AT_LEAST_ONCE scenario.
>>
>> Let me know about your preference.
>>
>> Regards,
>> - Tushar.
>>
>>
>> On Mon, Mar 6, 2017 at 8:31 PM, Vlad Rozov <v.ro...@datatorrent.com> 
>> <v.ro...@datatorrent.com>
>> wrote:
>>
>>
>> For a long chain of stateless operators at the end of a DAG, it is
>> possible that time to propagate the end window to a leaf operator is
>> greater than the time for a checkpoint to be persisted in HDFS.
>>
>> If at least once processing guarantee is necessary, the leaf operators
>> should not be STATELESS. Will invalidating DAG that has one or more
>>
>> leaf
>>
>> operator marked as STATELESS with AT_LEAST_ONCE processing solve
>> APEXCORE-619? It is not the best solution, but I think it is
>>
>> sufficient
>>
>> for
>>
>> the described scenario.
>>
>> Thank you,
>>
>> Vlad
>>
>>
>> On 3/2/17 08:43, Thomas Weise wrote:
>>
>>
>> Good point, that's correct for a stateless leaf operator (operator
>>
>> that
>>
>> does not have downstream operators). The minimum of upstream
>>
>> checkpoints
>>
>> can be higher than the last windowId seen by the leaf operator.
>>
>> Although
>>
>> that is a low probability, because it would mean the time it took for
>>
>> the
>>
>> checkpoint to become visible in HDFS is less than propagation of
>> endWindow
>> downstream.
>>
>> It's also not a problem for an intermediate stateless operator,
>>
>> because
>>
>> the
>> downstream checkpoint will inform the recovery windowId. Most of the
>>
>> time
>>
>> stateless operators are intermediate.
>>
>> Leaf operators are the output operators. I suspect in the original
>> scenario
>> is was a console output operator? Useful output operators usually
>>
>> won't
>>
>> be
>> stateless, they have to track state to interact with the external
>>
>> system
>>
>> correctly. I'm bringing this up for adequate cost/benefit analysis.
>>
>> In absence of stateful downstream operator, you only have the
>>
>> committed
>>
>> windowId, which is essentially a checkpointing watermark. On
>>
>> application
>>
>> restart it has to be recomputed from the checkpoints available, and
>>
>> does
>>
>> not cover the scenario Tushar reported originally.
>>
>> Saving committed windowId comes at a cost, it would have to be
>>
>> written
>>
>> to
>>
>> the journal before operators are notified. Care has been taken to no
>> write
>> unnecessarily to the journal, as it is blocking I/O and in this case
>>
>> the
>>
>> frequency depends on the order of arrival of checkpoint notifications
>> from
>> operators. We also don't want to delay commitedWindow notification,
>>
>> as
>>
>> that
>> would introduce latency.
>>
>> Thomas
>>
>>
>> On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <
>>
>> bhup...@datatorrent.com>
>>
>> wrote:
>>
>> What if all operators complete first checkpoints but the stateless
>>
>> operator
>> could not cross the first checkpoint window, and the DAG crashed.
>> If we try to figure out the recovery checkpoint now, we might
>>
>> conclude
>>
>> that
>> checkpoint 1 is the point to start and we may miss some data getting
>> processed by the stateless operator. Probably in this case at-least
>> once is
>> also not guaranteed?
>>
>> ~ Bhupesh
>>
>>
>> _______________________________________________________
>>
>> Bhupesh Chawda
>>
>> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>> www.datatorrent.com  |  apex.apache.org
>>
>>
>>
>> On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <t...@apache.org> 
>> <t...@apache.org>
>>
>> wrote:
>>
>> Dummy checkpoints, continuously writing committed window id and the
>>
>> like
>>
>> all introduce overhead that is probably not needed.
>>
>> All the information to derive what we need is likely available and
>>
>> IMO
>>
>> the
>>
>>
>> discussion should be on what is the correct way of using it. I will
>> have
>>
>>
>> a
>>
>>
>> look when I get to it as well.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <
>>
>> sand...@datatorrent.com
>>
>> wrote:
>>
>> Instead of treating the stateless operator in a special way and
>>
>> missing
>>
>> corner cases, just have a dummy checkpoint, then there is no need
>>
>> to
>>
>> handle
>>
>>
>> corner cases.
>>
>> There is a name for this 
>> solution,https://en.wikipedia.org/wiki/Null_Object_pattern
>>
>>
>>
>> On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pra...@datatorrent.com
>> wrote:
>>
>> There is code in various places that deals with stateless
>>
>> operators
>>
>> in
>>
>> a
>>
>>
>> special way even though a physical checkpoint does not exist on
>>
>> the
>>
>> disk.
>>
>> It is probably a matter of applying similar thought process/logic
>>
>> correctly
>>
>>
>> here.
>>
>> On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <a...@datatorrent.com
>>
>> wrote:
>>
>> hmm! the fact that commitWindowId has moved up (right now in
>>
>> memory
>>
>> of
>>
>> Stram) should mean that a complete set of checkpoints are
>>
>> available,
>>
>> i.e
>>
>> commitWindowId can be derived. Lets say that next checkpoint
>>
>> window
>>
>> also
>>
>> gets checkpointed across the app, commitwindowID is in memory but
>>
>> not
>>
>> written to stram-state yet, then upon relaunch the latest
>>
>> commitwindowID
>>
>> should get computed correctly.
>>
>> This may be just about setting stateless operators to
>>
>>
>> commitWindowid
>>
>> on
>>
>>
>> re-launch? aka bug/feature?
>>
>> Thks
>> Amol
>>
>>
>> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> 
>> <(510)%20449-2606> |
>>
>>
>> Twitter:
>>
>> @*amolhkekre*
>>
>> www.datatorrent.com  |  apex.apache.org
>>
>> *Join us at Apex Big Data World-San 
>> Jose<http://www.apexbigdata.com/san-jose.html> 
>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>> [image: 
>> http://www.apexbigdata.com/san-jose-register.html]<http://www.apexbigdata.com/san-jose-register.html>
>>  <http://www.apexbigdata.com/san-jose-register.html>
>>
>> On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
>>
>>
>> pra...@datatorrent.com>
>>
>> wrote:
>>
>> Do we need to save committedWindowId? Can't it be computed from
>>
>> existing
>>
>> checkpoints by walking through the DAG. We probably do this
>>
>> anyway
>>
>> and
>>
>> I
>>
>>
>> suspect there is a minor bug somewhere in there. If an operator
>>
>> is
>>
>> stateless you could assume checkpoint as long max for sake of
>>
>> computation
>>
>> and compute the committed window to be the lowest common
>>
>> checkpoint.
>>
>> If
>>
>>
>> they are all stateless and you end up with long max you can start
>>
>> with
>>
>> window id that reflects the current timestamp.
>>
>> Thanks
>>
>> On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <
>>
>> a...@datatorrent.com
>>
>> wrote:
>>
>> CommitWindowId could be computed from the existing checkpoints.
>>
>> That
>>
>> solution still needs purge to be done after commitWindowId is
>>
>> confirmed
>>
>> to
>>
>> be saved in Stram state. Without ths the commitWindowId
>>
>>
>> computed
>>
>> from
>>
>> the
>>
>> checkpoints may have some checkpoints missing.
>>
>> Thks
>> Amol
>>
>>
>> E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> 
>> <(510)%20449-2606> |
>>
>>
>> Twitter: @*amolhkekre*
>>
>> www.datatorrent.com  |  apex.apache.org
>>
>> *Join us at Apex Big Data World-San 
>> Jose<http://www.apexbigdata.com/san-jose.html> 
>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>> [image: 
>> http://www.apexbigdata.com/san-jose-register.html]<http://www.apexbigdata.com/san-jose-register.html>
>>  <http://www.apexbigdata.com/san-jose-register.html>
>>
>> On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
>>
>>
>> pra...@datatorrent.com
>>
>> wrote:
>>
>> Can't the commitedWindowId be calculated by looking at the
>>
>> physical
>>
>> plan
>>
>> and the existing checkpoints?
>>
>> On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
>>
>>
>> tus...@apache.org
>>
>> wrote:
>>
>> Help Needed for APEXCORE-619
>>
>> Issue : When application is relaunched after long time with
>>
>>
>> stateless
>>
>> opeartors at the end of the DAG, the stateless operators
>>
>> starts
>>
>> with
>>
>> a
>>
>>
>> very
>>
>> high windowId. In this case the stateless operator ignors
>>
>>
>> all
>>
>> the
>>
>> data
>>
>> received till upstream operator catches up with it. This
>>
>> breaks
>>
>> the
>>
>> *at-least-once* gaurantee while relaunch of the opeartor or
>>
>> when
>>
>> master
>>
>> is
>>
>> killed and application is restarted.
>>
>> Solutions:
>> - Fix windowId for stateless leaf operators from upstream
>>
>>
>> opeartor.
>>
>> But
>>
>> it
>>
>> has some issues when we have a join with two upstrams
>>
>>
>> operators
>>
>> at
>>
>> different windowId. If we set the windowID to min(upstream
>>
>> windowId),
>>
>> then
>>
>> we need to again recalulate the new recovery window ids for
>>
>>
>> upstream
>>
>> paths
>>
>> from this operators.
>>
>> - Other solution is to create a empty file in checkpoint
>>
>>
>> directory
>>
>> for
>>
>> stateless operators. This will help us to identify the
>>
>> checkpoints
>>
>> of
>>
>>
>> stateless operators during relaunch instead of computing
>>
>> from
>>
>> latest
>>
>> timestamp.
>>
>> - Bring the entire DAG to committedWindowId. This could be
>>
>>
>> achived
>>
>> using
>>
>> writing committedWindowId in a journal. we need to make
>>
>> sure
>>
>> that
>>
>> we
>>
>> are
>>
>> not puring the checkpointed state until the
>>
>> committedWundowId
>>
>> is
>>
>> saved
>>
>> in
>>
>>
>> journal.
>>
>> Let me know your thoughs on this and preferred solution.
>>
>> Regards,
>> -Tushar.
>>
>> --
>>
>> *Join us at Apex Big Data World-San 
>> Jose<http://www.apexbigdata.com/san-jose.html> 
>> <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
>> [image: http://www.apexbigdata.com/san-jose-register.html]
>>
>>
>>
>>
>>
>

Reply via email to