Hi All,

I would like to add support a simple back-pressure control between Coordinator 
and Worker in iceberg-kafka-connect connector, as described in detail below. 
Please take a look at the feature request:

https://github.com/apache/iceberg/issues/16389

1. Proposed Change

We are proposing a simple backpressure control mechanism between 
iceberg-kafka-connect’s Coordinator and Worker to be used when the Coordinator 
is overloaded or in trouble. We propose the Worker add Coordinator’s progress 
detection to pause itself to prevent exponential growth of control topic 
messages on the Coordinator.

2. The Problem

In the current iceberg-kafka-connect, Coordinator and Worker works in more or 
less decoupled mode with one-way communication from Coordinator to the Worker 
through a control topic.  When Coordinator faced network issue when committing 
to Iceberg Catalog Server and failed the current commit, the Worker keeps 
working and generating more or more new data files on S3 and caused Coordinator 
even bigger troubles on the next commit cycle.

3. The Solution

Building a sophisticated backpressure control feedback loop between the 
Coordinator and the Worker can be complex, but there is a relatively simple 
solution here.

The Idea

Worker can detect that Coordinator is in trouble:

The worker can read both START_COMMIT and COMMIT_COMPLETE messages from the 
control topic. When the worker only sees START_COMMIT message without matching 
COMMIT_COMPLETE message, it knows the Coordinator didn’t finish the Iceberg 
Commit in this cycle. If this fails for several consecutive cycles, it knows 
the Coordinator is in trouble and it needs to slow down or pause.

Worker can pause the work when the Coordinator is in trouble

4. The Proposal

Proposed spec changes:

Add two optional config parameters to control whether coordinator progress 
detection is enabled and how many failed committed cycles to mark coordinator 
as stalled.

Changes to iceberg-kafka-connect classes:

Worker class
- receive() method: ...
- save() method: ...

5. Breaking changes/incompatibilities

None. The new behavior is controlled by the config parameter 
iceberg.coordinator.progress.detection.enabled with a default value false. 

Please provide your feedback. Thanks,

Henry Haiying Cai


Reply via email to