[ 
https://issues.apache.org/jira/browse/BEAM-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated BEAM-9161:
-------------------------------------
    Description: 
In the DoFnOperator and the subclassed ExecutableStageDoFnOperator there are
effectively two processing threads:

(1) The main processing thread for processing elements and watermarks
(2) A timer thread to support ending bundles after a timeout to optimize for
latency

Although the code was written with a different assumption in the past, the two
threads do not interleave with each other. Only one is active at a time. This is
ensured by Flink's "checkpointLock" which is acquired for every method called on
the operator like processElement, processWatermark, snapshotState. It is also
acquired when timers are fired which are set using Flink's TimeService, like it
is the case for (2).

We've seen issues with bundles being closed multiple times resulting in
exceptions like "Already closed". Very rarely, we've also seen dead bundle ids
resurrecting for which no other explanation could be found than an inconsistent
view of the two thread.

  was:
The issue results in exceptions like "Already closed" due to a concurrent close 
of the remote input receivers from (1) the main thread and (2) the thread for 
finishing the bundle. In combination with the {{bundleFinishedCallback}} it may 
have also caused a slowdown due to closing a bundle when it was not needed.

We should make sure that only one thread at a time can start or finish a bundle.


> Inconsistent view of curent of ActiveBundle from main and bundle timer thread
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-9161
>                 URL: https://issues.apache.org/jira/browse/BEAM-9161
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> In the DoFnOperator and the subclassed ExecutableStageDoFnOperator there are
> effectively two processing threads:
> (1) The main processing thread for processing elements and watermarks
> (2) A timer thread to support ending bundles after a timeout to optimize for
> latency
> Although the code was written with a different assumption in the past, the two
> threads do not interleave with each other. Only one is active at a time. This 
> is
> ensured by Flink's "checkpointLock" which is acquired for every method called 
> on
> the operator like processElement, processWatermark, snapshotState. It is also
> acquired when timers are fired which are set using Flink's TimeService, like 
> it
> is the case for (2).
> We've seen issues with bundles being closed multiple times resulting in
> exceptions like "Already closed". Very rarely, we've also seen dead bundle ids
> resurrecting for which no other explanation could be found than an 
> inconsistent
> view of the two thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to