For a long chain of stateless operators at the end of a DAG, it is
possible that time to propagate the end window to a leaf operator is
greater than the time for a checkpoint to be persisted in HDFS.
If at least once processing guarantee is necessary, the leaf operators
should not be STATELESS. Will invalidating DAG that has one or more leaf
operator marked as STATELESS with AT_LEAST_ONCE processing solve
APEXCORE-619? It is not the best solution, but I think it is sufficient
for the described scenario.
Thank you,
Vlad
On 3/2/17 08:43, Thomas Weise wrote:
Good point, that's correct for a stateless leaf operator (operator that
does not have downstream operators). The minimum of upstream checkpoints
can be higher than the last windowId seen by the leaf operator. Although
that is a low probability, because it would mean the time it took for the
checkpoint to become visible in HDFS is less than propagation of endWindow
downstream.
It's also not a problem for an intermediate stateless operator, because the
downstream checkpoint will inform the recovery windowId. Most of the time
stateless operators are intermediate.
Leaf operators are the output operators. I suspect in the original scenario
is was a console output operator? Useful output operators usually won't be
stateless, they have to track state to interact with the external system
correctly. I'm bringing this up for adequate cost/benefit analysis.
In absence of stateful downstream operator, you only have the committed
windowId, which is essentially a checkpointing watermark. On application
restart it has to be recomputed from the checkpoints available, and does
not cover the scenario Tushar reported originally.
Saving committed windowId comes at a cost, it would have to be written to
the journal before operators are notified. Care has been taken to no write
unnecessarily to the journal, as it is blocking I/O and in this case the
frequency depends on the order of arrival of checkpoint notifications from
operators. We also don't want to delay commitedWindow notification, as that
would introduce latency.
Thomas
On Thu, Mar 2, 2017 at 2:10 AM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:
What if all operators complete first checkpoints but the stateless operator
could not cross the first checkpoint window, and the DAG crashed.
If we try to figure out the recovery checkpoint now, we might conclude that
checkpoint 1 is the point to start and we may miss some data getting
processed by the stateless operator. Probably in this case at-least once is
also not guaranteed?
~ Bhupesh
_______________________________________________________
Bhupesh Chawda
E: bhup...@datatorrent.com | Twitter: @bhupeshsc
www.datatorrent.com | apex.apache.org
On Thu, Mar 2, 2017 at 8:06 AM, Thomas Weise <t...@apache.org> wrote:
Dummy checkpoints, continuously writing committed window id and the like
all introduce overhead that is probably not needed.
All the information to derive what we need is likely available and IMO
the
discussion should be on what is the correct way of using it. I will have
a
look when I get to it as well.
Thanks,
Thomas
On Wed, Mar 1, 2017 at 6:29 PM, Sandesh Hegde <sand...@datatorrent.com>
wrote:
Instead of treating the stateless operator in a special way and missing
corner cases, just have a dummy checkpoint, then there is no need to
handle
corner cases.
There is a name for this solution,
https://en.wikipedia.org/wiki/Null_Object_pattern
On Wed, Mar 1, 2017 at 2:52 PM Pramod Immaneni <pra...@datatorrent.com
wrote:
There is code in various places that deals with stateless operators
in
a
special way even though a physical checkpoint does not exist on the
disk.
It is probably a matter of applying similar thought process/logic
correctly
here.
On Wed, Mar 1, 2017 at 2:27 PM, Amol Kekre <a...@datatorrent.com>
wrote:
hmm! the fact that commitWindowId has moved up (right now in memory
of
Stram) should mean that a complete set of checkpoints are
available,
i.e
commitWindowId can be derived. Lets say that next checkpoint window
also
gets checkpointed across the app, commitwindowID is in memory but
not
written to stram-state yet, then upon relaunch the latest
commitwindowID
should get computed correctly.
This may be just about setting stateless operators to
commitWindowid
on
re-launch? aka bug/feature?
Thks
Amol
E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
Twitter:
@*amolhkekre*
www.datatorrent.com | apex.apache.org
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>
On Wed, Mar 1, 2017 at 1:41 PM, Pramod Immaneni <
pra...@datatorrent.com>
wrote:
Do we need to save committedWindowId? Can't it be computed from
existing
checkpoints by walking through the DAG. We probably do this
anyway
and
I
suspect there is a minor bug somewhere in there. If an operator
is
stateless you could assume checkpoint as long max for sake of
computation
and compute the committed window to be the lowest common
checkpoint.
If
they are all stateless and you end up with long max you can start
with
window id that reflects the current timestamp.
Thanks
On Wed, Mar 1, 2017 at 1:09 PM, Amol Kekre <a...@datatorrent.com
wrote:
CommitWindowId could be computed from the existing checkpoints.
That
solution still needs purge to be done after commitWindowId is
confirmed
to
be saved in Stram state. Without ths the commitWindowId
computed
from
the
checkpoints may have some checkpoints missing.
Thks
Amol
E:a...@datatorrent.com | M: 510-449-2606 <(510)%20449-2606> |
Twitter: @*amolhkekre*
www.datatorrent.com | apex.apache.org
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]
<http://www.apexbigdata.com/san-jose-register.html>
On Wed, Mar 1, 2017 at 12:36 PM, Pramod Immaneni <
pra...@datatorrent.com
wrote:
Can't the commitedWindowId be calculated by looking at the
physical
plan
and the existing checkpoints?
On Wed, Mar 1, 2017 at 5:34 AM, Tushar Gosavi <
tus...@apache.org
wrote:
Help Needed for APEXCORE-619
Issue : When application is relaunched after long time with
stateless
opeartors at the end of the DAG, the stateless operators
starts
with
a
very
high windowId. In this case the stateless operator ignors
all
the
data
received till upstream operator catches up with it. This
breaks
the
*at-least-once* gaurantee while relaunch of the opeartor or
when
master
is
killed and application is restarted.
Solutions:
- Fix windowId for stateless leaf operators from upstream
opeartor.
But
it
has some issues when we have a join with two upstrams
operators
at
different windowId. If we set the windowID to min(upstream
windowId),
then
we need to again recalulate the new recovery window ids for
upstream
paths
from this operators.
- Other solution is to create a empty file in checkpoint
directory
for
stateless operators. This will help us to identify the
checkpoints
of
stateless operators during relaunch instead of computing
from
latest
timestamp.
- Bring the entire DAG to committedWindowId. This could be
achived
using
writing committedWindowId in a journal. we need to make
sure
that
we
are
not puring the checkpointed state until the
committedWundowId
is
saved
in
journal.
Let me know your thoughs on this and preferred solution.
Regards,
-Tushar.
--
*Join us at Apex Big Data World-San Jose
<http://www.apexbigdata.com/san-jose.html>, April 4, 2017!*
[image: http://www.apexbigdata.com/san-jose-register.html]