[
https://issues.apache.org/jira/browse/FLINK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307079#comment-17307079
]
Kezhu Wang commented on FLINK-21817:
------------------------------------
+1 for gradual procedure.
{quote} But we may even skip this for now, and not track splits in the Kafka
Source for this release. There is no functionality now that needs this tracking
{quote}
I actually could not give it a name for configuring, though old Kafka consumer
do respect a same assignment.
For (2), did you means {{KafkaSourceEnumerator.readerIdToSplitAssignments}} or
{{KafkaSourceEnumState.currentAssignment}} [~sewen] ? This tracking currently
does not hurt correctness, but still worth to drop. It also demands special
care as it is stored in state.
Hmm, I guess (1) also does not hurt correctness for now. Both (1), (2) are just
unnecessary atm :(. Anyway, worth to fix.
[~renqs] I guess with help of default method and
{{SplitEnumerator.addSplitsBack}}(fallback to this if default is not overrode),
it could be a no breaking change.
> New Kafka Source might break subtask and split assignment upon rescale
> ----------------------------------------------------------------------
>
> Key: FLINK-21817
> URL: https://issues.apache.org/jira/browse/FLINK-21817
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.12.2
> Reporter: Kezhu Wang
> Priority: Blocker
> Fix For: 1.13.0, 1.12.3
>
>
> On restoring, splits are add back directly to {{SourceReader}} in
> {{SourceOperator}}. In no rescaled restoring, bindings between splits and
> subtasks are preserved due to no repartition in
> {{RoundRobinOperatorStateRepartitioner}}. But in rescaled restoring, these
> operator states will be redistributed cross existing subtasks. This might
> break possible assignment from {{SourceEnumerator}}.
> Given {{KafkaSource}} as an example, the partition to subtask assignment is
> decided by {{KafkaSourceEnumerator.getSplitOwner}}. The mappings will break
> after rescaling.
> I pushed [a test
> case|https://github.com/kezhuw/flink/commit/9dc13cd9d7355d468a6ac8aae1b14b3a267581b6#diff-ad6e86c3757199ac3247687a71f9c34ee67b9ac743ae88a9f660950f27bec6eeR238]
> using {{KafkaSource}} for evaluation.
> I think it requires api addition to solve in generic and configurable way.
> Is it a valid issue ? I am not that sure.
> cc [~jqin] [~sewen]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)