[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005465#comment-16005465
 ] 

Eugene Kirpichov edited comment on BEAM-2140 at 5/10/17 9:22 PM:
-----------------------------------------------------------------

Aljoscha - SDF code does not inspect watermarks.
Here's what should happen really, when you apply an SDF to a BoundedSource that 
contains exactly 1 element (with more elements, it'll be more of the same).

1. We read an element from the source, and as it goes through the SDF 
expansion, it ends up in ProcessFn.
2. ProcessFn processes this element and its restriction, and if there's a 
residual restriction (checkpoint), then it sets a watermark hold and sets a 
timer to continue the processing.
3. The BoundedSource is done, so its watermark progresses to infinity - but 
this is fine. The input watermark of ProcessFn does NOT progress to infinity 
just yet, because it has set a watermark hold! (if it didn't set the hold, then 
its input watermark would also progress to infinity, and the timer would be 
late-data and hence dropped)
4. The timer set by ProcessFn fires, and it processes (calls ProcessElement) 
some more; again possibly setting a watermark hold and setting another timer to 
continue the processing. And so on.
5. Eventually the ProcessElement call finishes without producing a residual 
restriction. In that case, ProcessFn a) clears the watermark hold b) does NOT 
set a continuation timer.
6. After that, watermark of ProcessFn itself progresses to infinity (because 
there's no hold anymore) and the pipeline terminates.

I suspect that in the Flink implementation, something is going wrong between 
steps 3 and 4. E.g. maybe the watermark hold isn't working (i.e. isn't 
preventing the watermark of ProcessFn from progressing to infinity); or maybe 
somehow the processing-time timer gets dropped for a different reason.


was (Author: jkff):
Aljoscha - SDF code does not inspect watermarks.
Here's what should happen really, when you apply an SDF to a BoundedSource that 
contains exactly 1 element (with more elements, it'll be more of the same).

1. We read an element from the source, and as it goes through the SDF 
expansion, it ends up in ProcessFn.
2. ProcessFn processes this element and its restriction, and if there's a 
residual restriction (checkpoint), then it sets a watermark hold and sets a 
timer to continue the processing.
3. The BoundedSource is done, so its watermark progresses to infinity - but 
this is fine. The input watermark of ProcessFn does NOT progress to infinity 
just yet, because it has set a watermark hold! (if it didn't set the hold, then 
its input watermark would also progress to infinity, and the timer would be 
dropped)
4. The timer set by ProcessFn fires, and it processes (calls ProcessElement) 
some more; again possibly setting a watermark hold and setting another timer to 
continue the processing. And so on.
5. Eventually the ProcessElement call finishes without producing a residual 
restriction. In that case, ProcessFn a) clears the watermark hold b) does NOT 
set a continuation timer.
6. After that, watermark of ProcessFn itself progresses to infinity (because 
there's no hold anymore) and the pipeline terminates.

I suspect that in the Flink implementation, something is going wrong between 
steps 3 and 4. E.g. maybe the watermark hold isn't working (i.e. isn't 
preventing the watermark of ProcessFn from progressing to infinity); or maybe 
somehow the processing-time timer gets dropped for a different reason.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to