[ https://issues.apache.org/jira/browse/FLINK-31901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhipeng Zhang updated FLINK-31901: ---------------------------------- Description: Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. If we run the code snippet here[1], we are supposed to get logs as follows. {code:java} OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at time: 1682319149462 processed cached records, cnt: 10000 at time: 1682319149569 processed cached records, cnt: 20000 at time: 1682319149614 processed cached records, cnt: 30000 at time: 1682319149655 processed cached records, cnt: 40000 at time: 1682319149702 processed cached records, cnt: 50000 at time: 1682319149746 processed cached records, cnt: 60000 at time: 1682319149781 processed cached records, cnt: 70000 at time: 1682319149891 processed cached records, cnt: 80000 at time: 1682319150011 processed cached records, cnt: 90000 at time: 1682319150116 processed cached records, cnt: 100000 at time: 1682319150199 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at time: 1682319150378 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at time: 1682319150606 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at time: 1682319150704 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at time: 1682319150785 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at time: 1682319150859 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at time: 1682319150935 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at time: 1682319151007{code} We can find that from line#3 to line#12, there is no checkpoints and the barriers are blocked until all cached elements are processed, which takes ~600ms and much longer than checkpoint interval (i.e., 100ms) [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case was: Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast input until the broadcast inputs are all processed. After the broadcast variables are ready, we first process the cached records and then continue to process the newly arrived records. Processing cached elements is invoked via `Input#processElement` and `Input#processWatermark`. However, processing cached element may take a long time since there may be many cached records, which could potentially block the checkpoint barrier. If we run the code snippet here[1], we are supposed to get logs as follows. {code:java} OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 processed cached records, cnt: 10000 processed cached records, cnt: 20000 processed cached records, cnt: 30000 processed cached records, cnt: 40000 processed cached records, cnt: 50000 processed cached records, cnt: 60000 processed cached records, cnt: 70000 processed cached records, cnt: 80000 processed cached records, cnt: 90000 processed cached records, cnt: 100000 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 {code} We can find that from line#3 to line#12, there is no checkpoints and the barriers are blocked until all cached elements are processed. [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case > AbstractBroadcastWrapperOperator should not block checkpoint barriers when > processing cached records > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-31901 > URL: https://issues.apache.org/jira/browse/FLINK-31901 > Project: Flink > Issue Type: Improvement > Components: Library / Machine Learning > Reporter: Zhipeng Zhang > Priority: Major > Fix For: ml-2.3.0 > > > Currently `BroadcastUtils#withBroadcast` tries to caches the non-broadcast > input until the broadcast inputs are all processed. After the broadcast > variables are ready, we first process the cached records and then continue to > process the newly arrived records. > > Processing cached elements is invoked via `Input#processElement` and > `Input#processWatermark`. However, processing cached element may take a long > time since there may be many cached records, which could potentially block > the checkpoint barrier. > > If we run the code snippet here[1], we are supposed to get logs as follows. > {code:java} > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 1 at > time: 1682319149462 > processed cached records, cnt: 10000 at time: 1682319149569 > processed cached records, cnt: 20000 at time: 1682319149614 > processed cached records, cnt: 30000 at time: 1682319149655 > processed cached records, cnt: 40000 at time: 1682319149702 > processed cached records, cnt: 50000 at time: 1682319149746 > processed cached records, cnt: 60000 at time: 1682319149781 > processed cached records, cnt: 70000 at time: 1682319149891 > processed cached records, cnt: 80000 at time: 1682319150011 > processed cached records, cnt: 90000 at time: 1682319150116 > processed cached records, cnt: 100000 at time: 1682319150199 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 2 at > time: 1682319150378 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 3 at > time: 1682319150606 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 4 at > time: 1682319150704 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 5 at > time: 1682319150785 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 6 at > time: 1682319150859 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 7 at > time: 1682319150935 > OneInputBroadcastWrapperOperator doing checkpoint with checkpoint id: 8 at > time: 1682319151007{code} > > We can find that from line#3 to line#12, there is no checkpoints and the > barriers are blocked until all cached elements are processed, which takes > ~600ms and much longer than checkpoint interval (i.e., 100ms) > > [1]https://github.com/zhipeng93/flink-ml/tree/FLINK-31901-demo-case -- This message was sent by Atlassian Jira (v8.20.10#820010)