Xin Gao created FLINK-39837:
-------------------------------

             Summary: Make dynamic Kafka source resilient for Kafka topology 
instability 
                 Key: FLINK-39837
                 URL: https://issues.apache.org/jira/browse/FLINK-39837
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
            Reporter: Xin Gao


In Flink dynamic Kafka source, the Kafka metadata service could consume and 
update the Kafka source topology change dynamically. This is great to reduce 
the operation overhead in the happy path but is also making the system 
vulnerable under instability (non-happy path).
The Kafka topology metadata service/config could have flakiness and slight 
inconsistency in the distributed system. For example, during a rollout or 
restart, version of configs could be inconsistent between primary - standby or 
across replicas. In such scenarios, the Flink Kafka metadata service might
 # Get config version `n` at time `t`
 # Get config version `n - 1` at time `t + 1`
 # Get config version `n` at time `t + 2`

As a result, the dynamic source might *drop a cluster* at step 2 then 
*instantly and permanently* remove it from the checkpoint. Then at step 3, even 
if we eventually add it back, the progress would be lost (as cluster removed 
from checkpoint) and it would ingest from the EARLIEST.

Hence, to make it resilient under the distributed systems, I'd propose to 
expose an option (keep default same as today) so that we can retain the removed 
cluster/partition in the checkpoint for x days (to be configured) so that such 
flakiness would not cause the progress lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to