HenryCaiHaiying opened a new issue, #16389:
URL: https://github.com/apache/iceberg/issues/16389

   ### Proposed Change
   
   ### Proposed Change
   We are proposing a simple backpressure control mechanism between 
iceberg-kafka-connect’s Coordinator and Worker to be used when the Coordinator 
is overloaded or in trouble.  We propose the Worker add Coordinator’s progress 
detection to pause itself to prevent exponential growth of control topic 
messages on the Coordinator.
   
   ### The background of the iceberg-kafka-connect design
   In Iceberg-kafka-connect framework, there are N Workers launched in the 
cluster which consumes Kafka messages from source topic partitions and writes 
the transformed data to S3 (or other data storage system) but there is only one 
Coordinator which writes the metadata to Iceberg Catalog Server (e.g. 
HiveMetaStore).  The metadata includes the list of data file names each worker 
is writing to S3.  The Workers and the Coordinator communicate with each other 
through a special Kafka control topic. 
   
   The original design allows the Worker and Coordinator to operate in a 
decoupled fashion.  The worker consumes Kafka source topic partition data, 
transforms and writes to S3 on its own cadence.  The Coordinator wakes up 
periodically and starts a commit cycle.  During the commit cycle, the 
Coordinator gathers metadata information from workers through the messages on 
the control topic:
   1. The Coordinator 
[sends](https://github.com/apache/iceberg/blob/main/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java#L120)
 the START_COMMIT message to the control topic
   2. The Worker reads the START_COMMIT message and reports back with 
DATA_WRITTEN message for each Kafka source topic partition it has processed 
(and a DATA_COMPLETE message for each worker’s overall status);  Those messages 
are published on the control topic;
   3. The Coordinator 
[reads](https://github.com/apache/iceberg/blob/main/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java#L143)
 its DATA_WRITTEN message and extract the content (e.g. the data file name the 
worker is writing to S3) and writes that information to the Iceberg Catalog 
Server
   4. When every writes to Iceberg Catalog Server finishes successfully, the 
Coordinator commits to Iceberg Catalog Server and publishes a COMMIT_COMPLETE 
message to the control topic;
   
   Through the control topic, the communication direction is one way: from the 
Coordinator to the Workers.  The Worker is only processing the START_COMMIT 
message.
   
   ### The Problem
   There is no backpressure control from the Coordinator to the Worker.  
   
   When there is a network issue or an Iceberg Catalog Server availability 
issue, the Coordinator cannot commit successfully for all the metadata entries 
it needs to write to Iceberg Catalog Server, it has to fail the current commit. 
 Sleep a few minutes _(iceberg.control.commit.timeout-ms with the default value 
300 seconds)_ and retry the commit on the next commit cycle, it needs to retry 
all the metadata entry writing because of the atomicity of the commit.  However 
during those few minutes, all the workers still keep working (consuming more 
from Kafka source topic partitions and keep generating more new data files onto 
S3), when the Coordinator needs to commit this time, it would have to write all 
the metadata entries from the last failed time plus all the new ones generated 
in the last 300 seconds.
   
   If the network issue around Iceberg Catalog Server persists, the second 
commit cycle will be prone to fail again.  As the time goes on (even long after 
the initial network issue is resolved), the messages waiting to be processed in 
the control topic will get longer and longer and the Coordinator cannot finish 
a successful commit without some manual intervention even when the network 
issue is resolved.  In our environment with a few thousand KC worker tasks, we 
have seen a 15 minute HMS stability issue causing iceberg-kafka-connect to 
underperform for hours afterwards until some manual intervention.
   
   In a disputed data system, usually there is a built-in backpressure control: 
when there is a problem writing to downstream systems, the reading from 
upstream systems will need to slow down to not exacerbate the situation.  The 
iceberg-kafka-connect Worker has the natural backpressure control when it has 
problems writing to S3 however there is no feedback signal from the Coordinator 
to the Worker when the Coordinator is in trouble.
   
   ### The Solution
   Building a sophisticated backpressure control feedback loop between the 
Coordinator and the Worker can be complex, but there is a relatively simple 
solution here.
   
   #### The Idea 
   
   ##### Worker can detect that Coordinator is in trouble:
   The worker can read both START_COMMIT and COMMIT_COMPLETE messages from the 
control topic.  When the worker only sees START_COMMIT message without matching 
COMMIT_COMPLETE message, it knows the Coordinator didn’t finish the Iceberg 
Commit in this cycle.  If this fails for several consecutive cycles, it knows 
the Coordinator is in trouble and it needs to slow down or pause.
   
   ##### Worker can pause when the Coordinator is in trouble
   When the Worker detects that the Coordinator is stalled it can simply pause 
its work. Although a more sophisticated mechanism can be designed to have 
Worker gradually slow down, we want to keep it simple for the initial 
implementation.  A simple pause will relieve the backpressure on the 
Coordinator side.  The Coordinator can simply try to finish its commit work and 
drain all the messages from the control topic.
   
   The way to pause the processing for the Worker in Kafka Connect framework is 
to [throw a RetriableException on 
IcebergSinkTask.put()](https://github.com/apache/kafka/blob/4.3/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L97)
 method (IcebergSinkTask.put() will eventually call Worker.save() method and at 
that time Worker can throw exception), once KC framework catches the 
RetriableException from put() method call, [it will pause the the task and the 
source 
consumer.](https://github.com/apache/kafka/blob/4.3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L647)
   
   ##### Worker can resume once Coordinator is no longer in stalled state
   Once the Coordinator is able to finish one commit cycle, it will publish the 
COMMIT_COMPLETE message on the control topic.  When the Worker reads that 
message it can unpause itself.  This means it will no longer throw 
RetrieableException on the IcebergSinkTask.put() call.
   
   #### The Proposal
   ##### Proposed spec changes:
   Add two optional config parameters to control whether coordinator progress 
detection is enabled and how many failed committed cycles to mark coordinator 
as stalled.
   
   - property: iceberg.coordinator.progress.detection.enabled; 
   - - default: false; 
   - - description: Whether the worker will check the coordinator commit 
progress;
   - property: iceberg.coordinator.progress.stalled.cycles; 
   - - default: 3; 
   - - description: How many cycles of failed coordinator commit to mark 
coordinator stalled
   
   #### Changes to iceberg-kafka-connect classes
   ##### Worker class
   - receive() method: it will process both START_MESSAGE and COMMIT_MESSAGE to 
detect how many times the Coordinator has failed the commit cycles;  Once it 
detects enough failed cycles, it will mark itself in a PAUSED state;  Once it 
detects a new COMMIT_MESSAGE, it will clear the PAUSED state;
   - save() method: if it is in PAUSED state, it will throw a 
RetriableException instead of processing incoming sinkRecords
   
   ### Breaking changes/incompatibilities
   None.  The new behavior is controlled by the config parameter 
iceberg.coordinator.progress.detection.enabled with a default value false.  If 
this config parameter is not enabled, the Worker will run in the old behavior 
and it will not detect the Coordinator process nor will it stop the processing 
for itself.
   
   ### Alternatives considered
   
   1. Gradually slow down worker processing instead of completely 
pausing/stopping.  KC framework has the capability to automatically unpause the 
worker once 
[SinkTaskContext.timeout()](https://github.com/apache/kafka/blob/4.3/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java#L71)
 passes. We can use this feature to set increasingly large timeout() values 
depending on how many commit cycles failed on the Coordinator side, but we feel 
this might complicate the current design and we prefer to defer this to a new 
feature proposal.
   2. RPC style communication between Coordinator and Workers or introduce new 
message types on the control topic for communication.  Unnecessary since the 
existing START_MESSAGE and COMMIT_MESSAGE works well to indicate the 
Coordinator progress;
   3. Use other backpressure control mechanism like Flink’s watermark, we feel 
this will increase the scope of the work significantly;
   
   
   ### Proposal document
   
   _No response_
   
   ### Specifications
   
   - [ ] Table
   - [ ] View
   - [ ] REST
   - [ ] Puffin
   - [ ] Encryption
   - [x] Other


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to