But are they chained together? Could you provide the code from your job, at
least until operator A?

On Wed, Aug 7, 2019 at 3:03 PM Jan Lukavský <je...@seznam.cz> wrote:

> Actually, operator A is intermediate, source is preceding it.
>
> On 8/7/19 2:44 PM, Kostas Kloudas wrote:
> > Hi Jan,
> >
> > After looking at the code, my point 1) is false for *intermediate* tasks
> > and if you are
> > using a watermark assigner. This means that in these cases, Flink checks
> > that the
> > "next" watermark is greater than the "previous" one.
> >
> > But if your operator A is a source and you emit watermarks from the
> source,
> > then
> > it can happen that your watermark appears to go backwards on operator A,
> > but
> > operator B does the "correction" by discarding smaller watermarks. That
> can
> > explain
> > your observation.
> >
> > Cheers,
> > Kostas
> >
> > On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <je...@seznam.cz> wrote:
> >
> >> Hi Kostas,
> >>
> >> thanks for reaction, comments inline.
> >>
> >> On 8/7/19 1:59 PM, Kostas Kloudas wrote:
> >>> Hi Jan,
> >>>
> >>> Two pointers that may help you explain the behavior are the following:
> >>>
> >>> 1) If you have a custom watermark generator, I do not think that Flink
> >>> checks if it emits only monotonically
> >>> increasing watermarks. This is the responsibility of the generator
> >> itself.
> >>> This means that although you operator A
> >>> is topologically before operator B, operator A may have a smaller
> >> watermark
> >>> if your watermark generator allows so.
> >> I do generate watermarks by custom source, but I believe that the
> >> generated sequence is monotonic. But still, I'm not sure, that even if
> >> it was the case, that the generated watermark actually decreases, would
> >> that mean, that downstream operator after source (operator A) would
> >> actually "go back in time"?
> >>> 2) Flink currently does not checkpoint the last seen watermark (
> >>> https://issues.apache.org/jira/browse/FLINK-5601).
> >>> This means that after restoring, your (event) time is assumed to be
> >>> Long.Min until the first new watermark comes.
> >>> So if you observed late data not being late anymore or sth similar,
> then
> >> it
> >>> may not be that the two operators have
> >>> different watermarks but that after restoring event time rolls back to
> >> the
> >>> "beginning of time".
> >> I actually didn't observe any wrong or unexpected behavior, exceptions
> >> or wrong outputs. I just noticed this on Flink's WebUI and it looked
> >> strange to me. Could it be just that the WebUI showed older watermark
> >> for operator A? Strange was, that the watermarks were my screen long
> >> enough to take a screenshot (so at least say 10 seconds displaying
> >> watermark of operator A less than the one of operator B). Even if
> >> watermarks are not checkpointed, would it still be possible for
> >> watermark of operator B to be actually greater? I'm still confused of
> >> how this could happen, because (in my understanding) output watermark of
> >> operator A should be greater or equal to input watermark of B (because
> >> it takes minimum of inputs).
> >>
> >> Sorry if I'm too digging into this, but I don't like things I cannot
> >> explain, as they might point out to some bugs somewhere. :-) Or that my
> >> mental model it not aligned with reality.
> >>
> >> Jan
> >>
> >>> I hope this helps,
> >>> Kostas
> >>>
> >>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <je...@seznam.cz> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> I have just come across a weird state of operators after restore from
> >>>> checkpoint. After the restore, two operators that are connected (i.e.
> >>>> operator A is input of operator B) ended up with watermark of
> operator A
> >>>> being less than watermark of operator B. I don't know how to explain
> >>>> this. Can it be normal or does it signal a bug somewhere? If I
> >>>> understand Flink's checkpointing correctly, the checkpoint barrier
> flows
> >>>> from one operator to another, so the watermark should be aligned.
> >>>>
> >>>> I'm running a Beam pipeline on Flink 1.8.1.
> >>>>
> >>>> Am I missing something?
> >>>>
> >>>> Many thanks for comments,
> >>>>
> >>>>     Jan
> >>>>
> >>>>
>

Reply via email to