[ 
https://issues.apache.org/jira/browse/BEAM-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16894096#comment-16894096
 ] 

Jan Lukavský commented on BEAM-7785:
------------------------------------

Thanks for the state example and copy-on-write approach, I understand your 
point. I think the analogy is quite good, but there might be subtle 
differences, which might actually mean that both problems might use a different 
approach. Let me explain the differences I see between state and watermarks in 
this context:

 1) state has strong requirements in terms of consistency - after state is 
updated, every subsequent read has to see this update (read-after-write 
consistency)
 2) watermark update on the other hand is not required to happen instantly, it 
can happen eventually

That way, the "more clever" locking I was referring to, would be to actually 
lock the processing _only if there is currently no bundle being processed_ 
(inside given AppliedPTransform). That might not happen instantly, but should 
happen, eventually. I think, that this should be pretty much enough to ensure 
both consistency and performance, while keeping the code complexity low. I have 
a nearly working code for that, still need to verify, that it would not break 
anything (again :)).

If this approach would fail, I think it would be necessary to implement it as 
you suggest, but it (currently) seems to me, that it would bring a lot of 
complexity, because the copied watermark state would have to be correctly 
referenced from each bundle (or KeyedWorkItem).

> DirectRunner: watermarks are updated asynchronously from bundle processing
> --------------------------------------------------------------------------
>
>                 Key: BEAM-7785
>                 URL: https://issues.apache.org/jira/browse/BEAM-7785
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>    Affects Versions: 2.13.0
>            Reporter: Jan Lukavský
>            Assignee: Jan Lukavský
>            Priority: Major
>             Fix For: 2.15.0
>
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> Watermarks are updated in QuiescenceDriver (by calling fireTimers, which 
> calls forceRefresh()) on WatermarkManager. This results in creating timer 
> bundles, that are then processed asynchronously as DirectTransformExecutor. 
> Because of that, watermarks (input watermarks mostly) might be updated while 
> bundle is being processed. That violates assumption, that bundle processing 
> should be atomical (with identical external conditions during processing of 
> whole bundle).



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to