[ 
https://issues.apache.org/jira/browse/FLINK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306985#comment-17306985
 ] 

Stephan Ewen commented on FLINK-21817:
--------------------------------------

Thanks for this discussion and the analysis of the problems.

I would say that to fix this problem, let's not offer a general solution on the 
basic source enumerator (the {{SourceCoordinator}}) interface to track which 
readers have which split. I would suggest that if we want to track this in the 
Kafka Source, then let's implement it only in the Kafka Source with a custom 
event that is sent as a first thing after registration.

But we may even skip this for now, and not track splits in the Kafka Source for 
this release. There is no functionality now that needs this tracking, and it is 
a source of errors, so I would suggest to add the split tracking later when we 
actually need it and have better test coverage with a common test suite that 
includes tests that change the parallelism. Then we are confident to catch 
issues better.


So my suggested actions would be:

(1) Remove the SplitAssignmentTracker from the snapshots, because it isn't 
needed and might actually cause problems.
(2) Remove the split-to-reader tracking in the Kafka Source.

==> These two are 1.13 blockers

(3) Ensure our source test suite is in place
(4) Add split-to-reader-tracking to Kafka Source when we need this functionality
(5) Move this logic to the common source interface once it is stable in the 
Kafka Source


As a side note, I think split rebalancing isn't something we should start doing 
now, because there is no way to guarantee order of events in a partition when 
rebalancing: The events read by the newly assigned reader can overtake events 
from the old reader. We need something like epochs to establish clear 
before/after relationships for the in-flight events, which we implicitly get 
through checkpoints, that's why rescaling is safe.

The discussion about how to preserve order semantics per key with in-flight 
data is something we did for unaligned checkpoints. It is really complex to 
come up with a simple but effective semantical model, and unaligned checkpoints 
currently work only on {{keyBy()}} and {{rebalance()}} and {{random()}} data 
exchanges (not {{forward()}} or {{rescale()}}) because only for those three 
cases is it possible to provide meaningful semantics with the current parallel 
streams model that Flink has.
Just quickly doing something with source split rebalancing now will open up all 
sorts of problems and bugs if we don't answer the question about order 
guarantees before. So let's do one thing at a time. First get the basics of the 
new sources running very well. Then approach the next set of issues.

> New Kafka Source might break subtask and split assignment upon rescale
> ----------------------------------------------------------------------
>
>                 Key: FLINK-21817
>                 URL: https://issues.apache.org/jira/browse/FLINK-21817
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.12.2
>            Reporter: Kezhu Wang
>            Priority: Blocker
>             Fix For: 1.13.0, 1.12.3
>
>
> On restoring, splits are add back directly to {{SourceReader}} in 
> {{SourceOperator}}. In no rescaled restoring, bindings between splits and 
> subtasks are preserved due to no repartition in 
> {{RoundRobinOperatorStateRepartitioner}}. But in rescaled restoring, these 
> operator states will be redistributed cross existing subtasks. This might 
> break possible assignment from {{SourceEnumerator}}.
> Given {{KafkaSource}} as an example, the partition to subtask assignment is 
> decided by {{KafkaSourceEnumerator.getSplitOwner}}. The mappings will break 
> after rescaling.
> I pushed [a test 
> case|https://github.com/kezhuw/flink/commit/9dc13cd9d7355d468a6ac8aae1b14b3a267581b6#diff-ad6e86c3757199ac3247687a71f9c34ee67b9ac743ae88a9f660950f27bec6eeR238]
>  using {{KafkaSource}} for evaluation.
> I think it requires api addition to solve in generic and configurable way.
> Is it a valid issue ? I am not that sure.
> cc  [~jqin] [~sewen]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to