[
https://issues.apache.org/jira/browse/FLINK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306912#comment-17306912
]
Qingsheng Ren commented on FLINK-21817:
---------------------------------------
Thanks for the inspiring discussion!
About {{SplitAssignmentTracker}}, I agree with [~sewen] and [~kezhuw]. I think
it’s not necessary to keep it in the state of {{SourceCoordinator}} since it
will finally lose track of the per-subtask assignment after a global failover.
About split revoking, I’m +1 with [~becket_qin]. Whether or not to use it for
correcting split assignment, this is still useful under scenarios like load
balancing, and provides more flexibility for assignment decision made by split
enumerator.
According to opinions above, at least we have these TODOs on the current
FLIP-27 source no matter which solution we finally choose:
1. A mechanism for readers reporting current assignment back to coordinator.
This can be added to {{ReaderRegistrationEvent}}
2. An interface for passing these assignments to SplitEnumerator, like
{{addReader(int subtaskId, List<SplitT> splits)}} mentioned by [~kezhuw]. Since
{{SplitEnumerator}} is a public interface, this change requires a FLIP.
> New Kafka Source might break subtask and split assignment upon rescale
> ----------------------------------------------------------------------
>
> Key: FLINK-21817
> URL: https://issues.apache.org/jira/browse/FLINK-21817
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.12.2
> Reporter: Kezhu Wang
> Priority: Blocker
> Fix For: 1.13.0, 1.12.3
>
>
> On restoring, splits are add back directly to {{SourceReader}} in
> {{SourceOperator}}. In no rescaled restoring, bindings between splits and
> subtasks are preserved due to no repartition in
> {{RoundRobinOperatorStateRepartitioner}}. But in rescaled restoring, these
> operator states will be redistributed cross existing subtasks. This might
> break possible assignment from {{SourceEnumerator}}.
> Given {{KafkaSource}} as an example, the partition to subtask assignment is
> decided by {{KafkaSourceEnumerator.getSplitOwner}}. The mappings will break
> after rescaling.
> I pushed [a test
> case|https://github.com/kezhuw/flink/commit/9dc13cd9d7355d468a6ac8aae1b14b3a267581b6#diff-ad6e86c3757199ac3247687a71f9c34ee67b9ac743ae88a9f660950f27bec6eeR238]
> using {{KafkaSource}} for evaluation.
> I think it requires api addition to solve in generic and configurable way.
> Is it a valid issue ? I am not that sure.
> cc [~jqin] [~sewen]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)