Hi All, I would like to add support a simple back-pressure control between Coordinator and Worker in iceberg-kafka-connect connector, as described in detail below. Please take a look at the feature request:
https://github.com/apache/iceberg/issues/16389 1. Proposed Change We are proposing a simple backpressure control mechanism between iceberg-kafka-connect’s Coordinator and Worker to be used when the Coordinator is overloaded or in trouble. We propose the Worker add Coordinator’s progress detection to pause itself to prevent exponential growth of control topic messages on the Coordinator. 2. The Problem In the current iceberg-kafka-connect, Coordinator and Worker works in more or less decoupled mode with one-way communication from Coordinator to the Worker through a control topic. When Coordinator faced network issue when committing to Iceberg Catalog Server and failed the current commit, the Worker keeps working and generating more or more new data files on S3 and caused Coordinator even bigger troubles on the next commit cycle. 3. The Solution Building a sophisticated backpressure control feedback loop between the Coordinator and the Worker can be complex, but there is a relatively simple solution here. The Idea Worker can detect that Coordinator is in trouble: The worker can read both START_COMMIT and COMMIT_COMPLETE messages from the control topic. When the worker only sees START_COMMIT message without matching COMMIT_COMPLETE message, it knows the Coordinator didn’t finish the Iceberg Commit in this cycle. If this fails for several consecutive cycles, it knows the Coordinator is in trouble and it needs to slow down or pause. Worker can pause the work when the Coordinator is in trouble 4. The Proposal Proposed spec changes: Add two optional config parameters to control whether coordinator progress detection is enabled and how many failed committed cycles to mark coordinator as stalled. Changes to iceberg-kafka-connect classes: Worker class - receive() method: ... - save() method: ... 5. Breaking changes/incompatibilities None. The new behavior is controlled by the config parameter iceberg.coordinator.progress.detection.enabled with a default value false. Please provide your feedback. Thanks, Henry Haiying Cai
