HenryCaiHaiying opened a new issue, #16389: URL: https://github.com/apache/iceberg/issues/16389
### Proposed Change ### Proposed Change We are proposing a simple backpressure control mechanism between iceberg-kafka-connect’s Coordinator and Worker to be used when the Coordinator is overloaded or in trouble. We propose the Worker add Coordinator’s progress detection to pause itself to prevent exponential growth of control topic messages on the Coordinator. ### The background of the iceberg-kafka-connect design In Iceberg-kafka-connect framework, there are N Workers launched in the cluster which consumes Kafka messages from source topic partitions and writes the transformed data to S3 (or other data storage system) but there is only one Coordinator which writes the metadata to Iceberg Catalog Server (e.g. HiveMetaStore). The metadata includes the list of data file names each worker is writing to S3. The Workers and the Coordinator communicate with each other through a special Kafka control topic. The original design allows the Worker and Coordinator to operate in a decoupled fashion. The worker consumes Kafka source topic partition data, transforms and writes to S3 on its own cadence. The Coordinator wakes up periodically and starts a commit cycle. During the commit cycle, the Coordinator gathers metadata information from workers through the messages on the control topic: 1. The Coordinator [sends](https://github.com/apache/iceberg/blob/main/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java#L120) the START_COMMIT message to the control topic 2. The Worker reads the START_COMMIT message and reports back with DATA_WRITTEN message for each Kafka source topic partition it has processed (and a DATA_COMPLETE message for each worker’s overall status); Those messages are published on the control topic; 3. The Coordinator [reads](https://github.com/apache/iceberg/blob/main/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java#L143) its DATA_WRITTEN message and extract the content (e.g. the data file name the worker is writing to S3) and writes that information to the Iceberg Catalog Server 4. When every writes to Iceberg Catalog Server finishes successfully, the Coordinator commits to Iceberg Catalog Server and publishes a COMMIT_COMPLETE message to the control topic; Through the control topic, the communication direction is one way: from the Coordinator to the Workers. The Worker is only processing the START_COMMIT message. ### The Problem There is no backpressure control from the Coordinator to the Worker. When there is a network issue or an Iceberg Catalog Server availability issue, the Coordinator cannot commit successfully for all the metadata entries it needs to write to Iceberg Catalog Server, it has to fail the current commit. Sleep a few minutes _(iceberg.control.commit.timeout-ms with the default value 300 seconds)_ and retry the commit on the next commit cycle, it needs to retry all the metadata entry writing because of the atomicity of the commit. However during those few minutes, all the workers still keep working (consuming more from Kafka source topic partitions and keep generating more new data files onto S3), when the Coordinator needs to commit this time, it would have to write all the metadata entries from the last failed time plus all the new ones generated in the last 300 seconds. If the network issue around Iceberg Catalog Server persists, the second commit cycle will be prone to fail again. As the time goes on (even long after the initial network issue is resolved), the messages waiting to be processed in the control topic will get longer and longer and the Coordinator cannot finish a successful commit without some manual intervention even when the network issue is resolved. In our environment with a few thousand KC worker tasks, we have seen a 15 minute HMS stability issue causing iceberg-kafka-connect to underperform for hours afterwards until some manual intervention. In a disputed data system, usually there is a built-in backpressure control: when there is a problem writing to downstream systems, the reading from upstream systems will need to slow down to not exacerbate the situation. The iceberg-kafka-connect Worker has the natural backpressure control when it has problems writing to S3 however there is no feedback signal from the Coordinator to the Worker when the Coordinator is in trouble. ### The Solution Building a sophisticated backpressure control feedback loop between the Coordinator and the Worker can be complex, but there is a relatively simple solution here. #### The Idea ##### Worker can detect that Coordinator is in trouble: The worker can read both START_COMMIT and COMMIT_COMPLETE messages from the control topic. When the worker only sees START_COMMIT message without matching COMMIT_COMPLETE message, it knows the Coordinator didn’t finish the Iceberg Commit in this cycle. If this fails for several consecutive cycles, it knows the Coordinator is in trouble and it needs to slow down or pause. ##### Worker can pause when the Coordinator is in trouble When the Worker detects that the Coordinator is stalled it can simply pause its work. Although a more sophisticated mechanism can be designed to have Worker gradually slow down, we want to keep it simple for the initial implementation. A simple pause will relieve the backpressure on the Coordinator side. The Coordinator can simply try to finish its commit work and drain all the messages from the control topic. The way to pause the processing for the Worker in Kafka Connect framework is to [throw a RetriableException on IcebergSinkTask.put()](https://github.com/apache/kafka/blob/4.3/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L97) method (IcebergSinkTask.put() will eventually call Worker.save() method and at that time Worker can throw exception), once KC framework catches the RetriableException from put() method call, [it will pause the the task and the source consumer.](https://github.com/apache/kafka/blob/4.3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L647) ##### Worker can resume once Coordinator is no longer in stalled state Once the Coordinator is able to finish one commit cycle, it will publish the COMMIT_COMPLETE message on the control topic. When the Worker reads that message it can unpause itself. This means it will no longer throw RetrieableException on the IcebergSinkTask.put() call. #### The Proposal ##### Proposed spec changes: Add two optional config parameters to control whether coordinator progress detection is enabled and how many failed committed cycles to mark coordinator as stalled. - property: iceberg.coordinator.progress.detection.enabled; - - default: false; - - description: Whether the worker will check the coordinator commit progress; - property: iceberg.coordinator.progress.stalled.cycles; - - default: 3; - - description: How many cycles of failed coordinator commit to mark coordinator stalled #### Changes to iceberg-kafka-connect classes ##### Worker class - receive() method: it will process both START_MESSAGE and COMMIT_MESSAGE to detect how many times the Coordinator has failed the commit cycles; Once it detects enough failed cycles, it will mark itself in a PAUSED state; Once it detects a new COMMIT_MESSAGE, it will clear the PAUSED state; - save() method: if it is in PAUSED state, it will throw a RetriableException instead of processing incoming sinkRecords ### Breaking changes/incompatibilities None. The new behavior is controlled by the config parameter iceberg.coordinator.progress.detection.enabled with a default value false. If this config parameter is not enabled, the Worker will run in the old behavior and it will not detect the Coordinator process nor will it stop the processing for itself. ### Alternatives considered 1. Gradually slow down worker processing instead of completely pausing/stopping. KC framework has the capability to automatically unpause the worker once [SinkTaskContext.timeout()](https://github.com/apache/kafka/blob/4.3/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java#L71) passes. We can use this feature to set increasingly large timeout() values depending on how many commit cycles failed on the Coordinator side, but we feel this might complicate the current design and we prefer to defer this to a new feature proposal. 2. RPC style communication between Coordinator and Workers or introduce new message types on the control topic for communication. Unnecessary since the existing START_MESSAGE and COMMIT_MESSAGE works well to indicate the Coordinator progress; 3. Use other backpressure control mechanism like Flink’s watermark, we feel this will increase the scope of the work significantly; ### Proposal document _No response_ ### Specifications - [ ] Table - [ ] View - [ ] REST - [ ] Puffin - [ ] Encryption - [x] Other -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
