[
https://issues.apache.org/jira/browse/BEAM-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16894096#comment-16894096
]
Jan Lukavský commented on BEAM-7785:
------------------------------------
Thanks for the state example and copy-on-write approach, I understand your
point. I think the analogy is quite good, but there might be subtle
differences, which might actually mean that both problems might use a different
approach. Let me explain the differences I see between state and watermarks in
this context:
1) state has strong requirements in terms of consistency - after state is
updated, every subsequent read has to see this update (read-after-write
consistency)
2) watermark update on the other hand is not required to happen instantly, it
can happen eventually
That way, the "more clever" locking I was referring to, would be to actually
lock the processing _only if there is currently no bundle being processed_
(inside given AppliedPTransform). That might not happen instantly, but should
happen, eventually. I think, that this should be pretty much enough to ensure
both consistency and performance, while keeping the code complexity low. I have
a nearly working code for that, still need to verify, that it would not break
anything (again :)).
If this approach would fail, I think it would be necessary to implement it as
you suggest, but it (currently) seems to me, that it would bring a lot of
complexity, because the copied watermark state would have to be correctly
referenced from each bundle (or KeyedWorkItem).
> DirectRunner: watermarks are updated asynchronously from bundle processing
> --------------------------------------------------------------------------
>
> Key: BEAM-7785
> URL: https://issues.apache.org/jira/browse/BEAM-7785
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Affects Versions: 2.13.0
> Reporter: Jan Lukavský
> Assignee: Jan Lukavský
> Priority: Major
> Fix For: 2.15.0
>
> Time Spent: 4h
> Remaining Estimate: 0h
>
> Watermarks are updated in QuiescenceDriver (by calling fireTimers, which
> calls forceRefresh()) on WatermarkManager. This results in creating timer
> bundles, that are then processed asynchronously as DirectTransformExecutor.
> Because of that, watermarks (input watermarks mostly) might be updated while
> bundle is being processed. That violates assumption, that bundle processing
> should be atomical (with identical external conditions during processing of
> whole bundle).
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)