[ 
https://issues.apache.org/jira/browse/FLINK-38131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renxiang Zhou updated FLINK-38131:
----------------------------------
    Description: 
FLINK-23475 proposes how to distribute Broadcast state to new operator 
instances to avoid new instances getting empty Broadcast state when there are 
some finished operator in the DAG, and the task is restarted without changing 
the parallelism. However, the current implementation seems to have some 
problems in determining whether there is a finished operator.

As the pic shown below, `isPartiallyReported` compares the value between 
BitSet's 
`cardinality` and `size`, where `cardinality` represents the number of 
operators that contain the Broadcast state, but `size` just returns the number 
of bits of space actually in use by this BitSet. It makes no sense to compare 
these two parameters, which causes the Broadcast state of operators without 
finished status to be redistributed when the parallelism does not change, which 
is unnecessary. I think we should use the `parallelism` in the constructor 
instead of `reportedSubtaskIndices.size()`
!image-2025-07-22-17-34-22-479.png|width=528,height=334!

 

  was:
FLINK-23475 proposes how to distribute Broadcast state to new operator 
instances to avoid new instances getting empty Broadcast state when there are 
some finished operator in the DAG, and the task is restarted without changing 
the parallelism. However, the current implementation seems to have some 
problems in determining whether there is a finished operator.

As the pic shown below, `isPartiallyReported` compares the value between 
BitSet's 
`cardinality` and `size`, where `cardinality` represents the number of 
operators that contain the Broadcast state, but `size` just returns the number 
of bits of space actually in use by this BitSet. It makes no sense to compare 
these two parameters, which causes the Broadcast state of operators without 
finished status to be redistributed when the parallelism does not change, which 
is unnecessary. I think we should use the `parallelism` in the constructor 
instead of `reportedSubtaskIndices.size()`
!image-2025-07-22-17-34-22-479.png!

 


> Repartition BroadcastState with finished operators
> --------------------------------------------------
>
>                 Key: FLINK-38131
>                 URL: https://issues.apache.org/jira/browse/FLINK-38131
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0
>            Reporter: Renxiang Zhou
>            Priority: Major
>             Fix For: 2.1.0
>
>         Attachments: image-2025-07-22-17-34-22-479.png
>
>
> FLINK-23475 proposes how to distribute Broadcast state to new operator 
> instances to avoid new instances getting empty Broadcast state when there are 
> some finished operator in the DAG, and the task is restarted without changing 
> the parallelism. However, the current implementation seems to have some 
> problems in determining whether there is a finished operator.
> As the pic shown below, `isPartiallyReported` compares the value between 
> BitSet's 
> `cardinality` and `size`, where `cardinality` represents the number of 
> operators that contain the Broadcast state, but `size` just returns the 
> number of bits of space actually in use by this BitSet. It makes no sense to 
> compare these two parameters, which causes the Broadcast state of operators 
> without finished status to be redistributed when the parallelism does not 
> change, which is unnecessary. I think we should use the `parallelism` in the 
> constructor instead of `reportedSubtaskIndices.size()`
> !image-2025-07-22-17-34-22-479.png|width=528,height=334!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to