[
https://issues.apache.org/jira/browse/FLINK-21817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306985#comment-17306985
]
Stephan Ewen commented on FLINK-21817:
--------------------------------------
Thanks for this discussion and the analysis of the problems.
I would say that to fix this problem, let's not offer a general solution on the
basic source enumerator (the {{SourceCoordinator}}) interface to track which
readers have which split. I would suggest that if we want to track this in the
Kafka Source, then let's implement it only in the Kafka Source with a custom
event that is sent as a first thing after registration.
But we may even skip this for now, and not track splits in the Kafka Source for
this release. There is no functionality now that needs this tracking, and it is
a source of errors, so I would suggest to add the split tracking later when we
actually need it and have better test coverage with a common test suite that
includes tests that change the parallelism. Then we are confident to catch
issues better.
So my suggested actions would be:
(1) Remove the SplitAssignmentTracker from the snapshots, because it isn't
needed and might actually cause problems.
(2) Remove the split-to-reader tracking in the Kafka Source.
==> These two are 1.13 blockers
(3) Ensure our source test suite is in place
(4) Add split-to-reader-tracking to Kafka Source when we need this functionality
(5) Move this logic to the common source interface once it is stable in the
Kafka Source
As a side note, I think split rebalancing isn't something we should start doing
now, because there is no way to guarantee order of events in a partition when
rebalancing: The events read by the newly assigned reader can overtake events
from the old reader. We need something like epochs to establish clear
before/after relationships for the in-flight events, which we implicitly get
through checkpoints, that's why rescaling is safe.
The discussion about how to preserve order semantics per key with in-flight
data is something we did for unaligned checkpoints. It is really complex to
come up with a simple but effective semantical model, and unaligned checkpoints
currently work only on {{keyBy()}} and {{rebalance()}} and {{random()}} data
exchanges (not {{forward()}} or {{rescale()}}) because only for those three
cases is it possible to provide meaningful semantics with the current parallel
streams model that Flink has.
Just quickly doing something with source split rebalancing now will open up all
sorts of problems and bugs if we don't answer the question about order
guarantees before. So let's do one thing at a time. First get the basics of the
new sources running very well. Then approach the next set of issues.
> New Kafka Source might break subtask and split assignment upon rescale
> ----------------------------------------------------------------------
>
> Key: FLINK-21817
> URL: https://issues.apache.org/jira/browse/FLINK-21817
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.12.2
> Reporter: Kezhu Wang
> Priority: Blocker
> Fix For: 1.13.0, 1.12.3
>
>
> On restoring, splits are add back directly to {{SourceReader}} in
> {{SourceOperator}}. In no rescaled restoring, bindings between splits and
> subtasks are preserved due to no repartition in
> {{RoundRobinOperatorStateRepartitioner}}. But in rescaled restoring, these
> operator states will be redistributed cross existing subtasks. This might
> break possible assignment from {{SourceEnumerator}}.
> Given {{KafkaSource}} as an example, the partition to subtask assignment is
> decided by {{KafkaSourceEnumerator.getSplitOwner}}. The mappings will break
> after rescaling.
> I pushed [a test
> case|https://github.com/kezhuw/flink/commit/9dc13cd9d7355d468a6ac8aae1b14b3a267581b6#diff-ad6e86c3757199ac3247687a71f9c34ee67b9ac743ae88a9f660950f27bec6eeR238]
> using {{KafkaSource}} for evaluation.
> I think it requires api addition to solve in generic and configurable way.
> Is it a valid issue ? I am not that sure.
> cc [~jqin] [~sewen]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)