[ 
https://issues.apache.org/jira/browse/BEAM-14387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530321#comment-17530321
 ] 

Luke Cwik commented on BEAM-14387:
----------------------------------


Run a debugger and start tracing from 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java

Note that this will enter the SplittableProcessElementsInvoker which is shared 
implementation across Java based runners (e.g. dataflow v1, flink). I would 
check that the current bundle is being checkpointed since that is the only time 
the restriction advances from the perspective of the runner.

> DirectRunner does not update reference to currentRestriction when running in 
> SDF
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-14387
>                 URL: https://issues.apache.org/jira/browse/BEAM-14387
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Pablo Estrada
>            Assignee: Luke Cwik
>            Priority: P2
>
> I have an SDF implementation that looks like so:
>  
> {code:java}
> class MyRestrictionTracker {
>   MyRestriction restriction;
>   currentRestriction() { return restriction; }
>   tryClaim(MyPosition position) {
>     this.restriction = new MyRestriction(position)
>   }
> }{code}
> I ran this on the DirectRunner, and the restriction would never advance: It 
> would get stuck on the very first value.
> I also ran this on DataflowRunner, and the problem did not exist there: This 
> ran fine.
>  
> I was able to fix this on the DirectRunner (it works well on Dataflow as 
> well) by changing the restriction to be mutable. Something like this:
>  
> {code:java}
> class MyRestrictionTracker {
>   MyRestriction restriction;
>   currentRestriction() { return restriction; }
>   tryClaim(MyPosition position) {
>     this.restriction.position = position;
>   }
> }{code}
> This looks like an execution issue with SDF on DirectRunner: The DirectRunner 
> is likely storing a reference to `currentRestriction()` and never updating it 
> as it runs.
>  
> I'm happy to fix this on the DirectRunner - I would just like to find 
> pointers : )



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to