[
https://issues.apache.org/jira/browse/FLINK-39837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18086438#comment-18086438
]
Xin Gao commented on FLINK-39837:
---------------------------------
Hi [~jubinsoni]
No,
[apache/flink-connector-kafka#264|https://github.com/apache/flink-connector-kafka/pull/264]
intentionally contains connector-specific changes only.
For this issue, DynamicKafkaSource already owns the relevant checkpoint state:
reader split offsets and dynamic enumerator state. Retaining removed Kafka
cluster state there is sufficient to restore progress when the cluster is
re-added within the retention window.
The SplitAssignmentTracker / SourceCoordinatorSerdeUtils changes in
[apache/flink#28318|https://github.com/apache/flink/pull/28318] look like a
broader framework proposal for generic Flink sources, but they are not a
dependency for this Kafka connector fix.
Also, from the current diff in
[apache/flink-connector-kafka#265|https://github.com/apache/flink-connector-kafka/pull/265],
I don’t think it is sufficient yet to fully handle the DynamicKafkaSource
behavior needed here, especially the connector-specific removed-cluster
reader/enumerator semantics.
For the broader Flink framework change, I’m not sure whether there is already
an issue/discussion and consensus for introducing that generic mechanism.
> Make dynamic Kafka source resilient for Kafka topology instability
> -------------------------------------------------------------------
>
> Key: FLINK-39837
> URL: https://issues.apache.org/jira/browse/FLINK-39837
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Reporter: Xin Gao
> Priority: Critical
> Labels: pull-request-available
>
> In Flink dynamic Kafka source, the Kafka metadata service could consume and
> update the Kafka source topology change dynamically. This is great to reduce
> the operation overhead in the happy path but is also making the system
> vulnerable under instability (non-happy path).
> The Kafka topology metadata service/config could have flakiness and slight
> inconsistency in the distributed system. For example, during a rollout or
> restart, version of configs could be inconsistent between primary - standby
> or across replicas. In such scenarios, the Flink Kafka metadata service might
> # Get config version `n` at time `t`
> # Get config version `n - 1` at time `t + 1`
> # Get config version `n` at time `t + 2`
> As a result, the dynamic source might *drop a cluster* at step 2 then
> *instantly and permanently* remove it from the checkpoint. Then at step 3,
> even if we eventually add it back, the progress would be lost (as cluster
> removed from checkpoint) and it would ingest from the EARLIEST.
> Hence, to make it resilient under the distributed systems, I'd propose to
> expose an option (keep default same as today) so that we can retain the
> removed cluster/partition in the checkpoint for x days (to be configured) so
> that such flakiness would not cause the progress lost.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)