[ 
https://issues.apache.org/jira/browse/FLINK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306805#comment-17306805
 ] 

Kezhu Wang commented on FLINK-21817:
------------------------------------

I guess we four are probably talking about two inconsistencies:
 # Inconsistency between {{SplitAssignmentTracker}} and operator instances.
 # Inconsistency between split enumerator and operator instances about possible 
contract that split assignment enforced by concrete split enumerators. For 
{{KafkaSource}}, the contract is {{KafkaSourceEnumerator.getSplitOwner}}.

I did not aware of #1, but there are related anyway.

IIUC, {{SplitAssignmentTracker}} is responsible for region recovering from last 
successful checkpoint. If this is the case, I think it makes no sense to 
snapshot and restore it.

For #2, I am not sure whether it is a valid feature request. I assume yes for 
now. I think {{void addReader(int subtaskId, List<SplitT> splits)}} will help 
{{SourceOperator}} to piggyback restored splits on open(eg. no 
{{SourceReader.addSplits}} on operator startup). To make it configurable, a 
boolean flag in {{Source}} could guide {{SourceOperator}} how to do on 
restoring. Comparing to legacy source, this will not only make 
{{SplitEnumerator}} a split assigner but *the* assigner.
{quote}I think we need to make sure users understand that the Enumerator should 
not in any case make any assumption about what is going on at the readers after 
a failure or restore. That is an anti-pattern in my experience, doing 
bookkeeping twice and trying to keep it in sync. It will very often break at 
some points.
{quote}
I agree this. {{KafkaSourceEnumState.currentAssignment}} is misleading also and 
will inconsistent with operator instances after rescaling.

For solutions offered by  [~renqs], I guess we all opt out solution#3. For 
solution#1, I did not see values to snapshot and restore 
{{SplitAssignmentTracker}}. For solution#2, how to revoke in case of 
inconsistency#2 is valid ?

> New Kafka Source might break subtask and split assignment upon rescale
> ----------------------------------------------------------------------
>
>                 Key: FLINK-21817
>                 URL: https://issues.apache.org/jira/browse/FLINK-21817
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.12.2
>            Reporter: Kezhu Wang
>            Priority: Blocker
>             Fix For: 1.13.0, 1.12.3
>
>
> On restoring, splits are add back directly to {{SourceReader}} in 
> {{SourceOperator}}. In no rescaled restoring, bindings between splits and 
> subtasks are preserved due to no repartition in 
> {{RoundRobinOperatorStateRepartitioner}}. But in rescaled restoring, these 
> operator states will be redistributed cross existing subtasks. This might 
> break possible assignment from {{SourceEnumerator}}.
> Given {{KafkaSource}} as an example, the partition to subtask assignment is 
> decided by {{KafkaSourceEnumerator.getSplitOwner}}. The mappings will break 
> after rescaling.
> I pushed [a test 
> case|https://github.com/kezhuw/flink/commit/9dc13cd9d7355d468a6ac8aae1b14b3a267581b6#diff-ad6e86c3757199ac3247687a71f9c34ee67b9ac743ae88a9f660950f27bec6eeR238]
>  using {{KafkaSource}} for evaluation.
> I think it requires api addition to solve in generic and configurable way.
> Is it a valid issue ? I am not that sure.
> cc  [~jqin] [~sewen]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to