But are they chained together? Could you provide the code from your job, at least until operator A?
On Wed, Aug 7, 2019 at 3:03 PM Jan Lukavský <je...@seznam.cz> wrote: > Actually, operator A is intermediate, source is preceding it. > > On 8/7/19 2:44 PM, Kostas Kloudas wrote: > > Hi Jan, > > > > After looking at the code, my point 1) is false for *intermediate* tasks > > and if you are > > using a watermark assigner. This means that in these cases, Flink checks > > that the > > "next" watermark is greater than the "previous" one. > > > > But if your operator A is a source and you emit watermarks from the > source, > > then > > it can happen that your watermark appears to go backwards on operator A, > > but > > operator B does the "correction" by discarding smaller watermarks. That > can > > explain > > your observation. > > > > Cheers, > > Kostas > > > > On Wed, Aug 7, 2019 at 2:30 PM Jan Lukavský <je...@seznam.cz> wrote: > > > >> Hi Kostas, > >> > >> thanks for reaction, comments inline. > >> > >> On 8/7/19 1:59 PM, Kostas Kloudas wrote: > >>> Hi Jan, > >>> > >>> Two pointers that may help you explain the behavior are the following: > >>> > >>> 1) If you have a custom watermark generator, I do not think that Flink > >>> checks if it emits only monotonically > >>> increasing watermarks. This is the responsibility of the generator > >> itself. > >>> This means that although you operator A > >>> is topologically before operator B, operator A may have a smaller > >> watermark > >>> if your watermark generator allows so. > >> I do generate watermarks by custom source, but I believe that the > >> generated sequence is monotonic. But still, I'm not sure, that even if > >> it was the case, that the generated watermark actually decreases, would > >> that mean, that downstream operator after source (operator A) would > >> actually "go back in time"? > >>> 2) Flink currently does not checkpoint the last seen watermark ( > >>> https://issues.apache.org/jira/browse/FLINK-5601). > >>> This means that after restoring, your (event) time is assumed to be > >>> Long.Min until the first new watermark comes. > >>> So if you observed late data not being late anymore or sth similar, > then > >> it > >>> may not be that the two operators have > >>> different watermarks but that after restoring event time rolls back to > >> the > >>> "beginning of time". > >> I actually didn't observe any wrong or unexpected behavior, exceptions > >> or wrong outputs. I just noticed this on Flink's WebUI and it looked > >> strange to me. Could it be just that the WebUI showed older watermark > >> for operator A? Strange was, that the watermarks were my screen long > >> enough to take a screenshot (so at least say 10 seconds displaying > >> watermark of operator A less than the one of operator B). Even if > >> watermarks are not checkpointed, would it still be possible for > >> watermark of operator B to be actually greater? I'm still confused of > >> how this could happen, because (in my understanding) output watermark of > >> operator A should be greater or equal to input watermark of B (because > >> it takes minimum of inputs). > >> > >> Sorry if I'm too digging into this, but I don't like things I cannot > >> explain, as they might point out to some bugs somewhere. :-) Or that my > >> mental model it not aligned with reality. > >> > >> Jan > >> > >>> I hope this helps, > >>> Kostas > >>> > >>> On Wed, Aug 7, 2019 at 12:11 PM Jan Lukavský <je...@seznam.cz> wrote: > >>> > >>>> Hi all, > >>>> > >>>> I have just come across a weird state of operators after restore from > >>>> checkpoint. After the restore, two operators that are connected (i.e. > >>>> operator A is input of operator B) ended up with watermark of > operator A > >>>> being less than watermark of operator B. I don't know how to explain > >>>> this. Can it be normal or does it signal a bug somewhere? If I > >>>> understand Flink's checkpointing correctly, the checkpoint barrier > flows > >>>> from one operator to another, so the watermark should be aligned. > >>>> > >>>> I'm running a Beam pipeline on Flink 1.8.1. > >>>> > >>>> Am I missing something? > >>>> > >>>> Many thanks for comments, > >>>> > >>>> Jan > >>>> > >>>> >