[ 
https://issues.apache.org/jira/browse/FLINK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305961#comment-17305961
 ] 

Qingsheng Ren edited comment on FLINK-21817 at 3/23/21, 2:07 AM:
-----------------------------------------------------------------

Thanks for the ticket [~kezhuw]! I took a look of new source's code, and this 
bug does exist when the job rescales. 

Some possible solutions: 

1. SourceOperator reports its restored splits back to {{SourceCoordinator}} at 
registration, and {{SourceCoordinator}} updates its {{SplitsAssignmentTracker}} 
in the context. 

2. SourceOperator reports its restored splits back to {{SourceCoordinator}} at 
registration, and if these splits don't match with the 
{{SplitsAssignmentTracker}}, {{SourceCoordinator}} will revoke them and 
re-assign correct splits to the reader. 

3. Instead of using {{RoundRobinOperatorStateRepartitioner}}, we can design a 
new partitioner, which will distribute splits according to the assignment 
tracked by {{SourceCoordinator}}. 

Personally I prefer the second one, because as the leader of {{SourceReader}}, 
{{SourceCoordinator}} holds the ground truth. For the first option, round-robin 
style partitioning might break the decision made by {{SourceEnumerator}}. Also 
I think the second one is easier to implement compared to the third solution.  

In order to make the second solution work, we also needs a split revocation 
mechanism to revoke mis-assigned splits after rescaling. 


was (Author: renqs):
Thanks for the ticket [~kezhuw]! I took a look of new source's code, and this 
bug does exist when the job rescales. 

Some possible solutions: 

1. SourceOperator reports its restored splits back to {{SourceCoordinator}} at 
registration, and {{SourceCoordinator}} updates its {{SplitsAssignmentTracker}} 
in the context. 

2. SourceOperator reports its restored splits back to {{SourceCoordinator}} at 
registration, and if these splits don't match with the 
{{SplitsAssignmentTracker}}, {{SourceCoordinator}} will revoke them and 
re-assign correct splits to the reader. 

3. Instead of using {{RoundRobinOperatorStateRepartitioner}}, we can design a 
new partitioner, which will distribute splits according to the assignment 
tracked by {{SourceCoordinator}}. 

Personally I prefer the second one, because as the leader of {{SourceReader}}, 
{{SourceOperator}} holds the ground truth. For the first option, round-robin 
style partitioning might break the decision made by {{SourceEnumerator}}. Also 
I think the second one is easier to implement compared to the third solution.  

In order to make the second solution work, we also needs a split revocation 
mechanism to revoke mis-assigned splits after rescaling. 

> New Kafka Source might break subtask and split assignment upon rescale
> ----------------------------------------------------------------------
>
>                 Key: FLINK-21817
>                 URL: https://issues.apache.org/jira/browse/FLINK-21817
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.12.2
>            Reporter: Kezhu Wang
>            Priority: Blocker
>             Fix For: 1.13.0, 1.12.3
>
>
> On restoring, splits are add back directly to {{SourceReader}} in 
> {{SourceOperator}}. In no rescaled restoring, bindings between splits and 
> subtasks are preserved due to no repartition in 
> {{RoundRobinOperatorStateRepartitioner}}. But in rescaled restoring, these 
> operator states will be redistributed cross existing subtasks. This might 
> break possible assignment from {{SourceEnumerator}}.
> Given {{KafkaSource}} as an example, the partition to subtask assignment is 
> decided by {{KafkaSourceEnumerator.getSplitOwner}}. The mappings will break 
> after rescaling.
> I pushed [a test 
> case|https://github.com/kezhuw/flink/commit/9dc13cd9d7355d468a6ac8aae1b14b3a267581b6#diff-ad6e86c3757199ac3247687a71f9c34ee67b9ac743ae88a9f660950f27bec6eeR238]
>  using {{KafkaSource}} for evaluation.
> I think it requires api addition to solve in generic and configurable way.
> Is it a valid issue ? I am not that sure.
> cc  [~jqin] [~sewen]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to