HenryCaiHaiying commented on issue #16389:
URL: https://github.com/apache/iceberg/issues/16389#issuecomment-4569151790

   @laskoviymishka Thanks for the detailed review comment.  I have incorporated 
some of your suggestions in the updated RFC proposal (adding a section for 
metrics, adding explicit pause interval, adding jitter on worker resumption) 
but I need some more discussions/clarifications before updating more content.
   
   For [#16361](https://github.com/apache/iceberg/issues/16361), it's mainly 
about an inefficient processing of entries in readyBuffer, this is an in-memory 
processing.  Although we can improve this in-memory processing from O(N^2) to 
O(N), it will not resolve the bottleneck of the backlog building/processing, 
most of the backlog processing time is spent on reading Kafka messages and 
writing to Iceberg Catalog Server.  By the way, do you mind also taking a look 
at the [proposed fix ](https://github.com/apache/iceberg/pull/16453)for that 
bug?  It is a simple one-liner fix.
   
   For the question whether we should use the signal of { pending_files, 
pending_partitions, last_commit_outcome } instead of the 3 stalled commit 
cycles, I kind of feel it is easier to understand n stalled commit cycles than 
tuning the number of pending_files/pending_partitions.  Although 
pending_files/pending_partitions gives more finer control on monitor the 
progress of coordinator, it will become an overhead for the operator to find a 
good number to use as the alert threshold.  An operator is probably 
managing/monitoring multiple iceberg ingestion pipelines with each one 
processing a different kafka topic.  Each Kafka topic has different 
characteristics in terms of processing speed, number of partitions, number of 
files to be written per time period.  Those numbers can be changing over the 
time or during the time of the day/week. And one pipeline might be ingesting 
data from multiple topic partitions, finding a good number as the threshold is 
not going to be easy.  On the other h
 and, n stalled commit cycle has more intuitions to grasp.  A healthy pipeline 
is structured/tuned to have the coordinator and workers to work along with each 
other in a cadence.  The coordinator is supposed to be able to commit what the 
worker has accumulated within one commit-cycle, inability of the coordinator 
commit indicate mismatch of processing speed.  Several cycles of failed commit 
indicate an action needs to be taken to relieve the pressure.
   
   Regarding initial-state grace period, I wasn't sure about the exact problem 
of a new working joining mid-cycle.  The new worker will read the Kafka 
messages sequentially from the control topic, it will read a COMMIT_MESSAGE 
during the time it reads 3 START_COMMIT messages if that COMMIT_MESSAGE was 
published in between.  Are you worried that coordinator switched 3 times during 
group rebalance and each coordinator publish a START_COMMIT message without 
finishing COMMIT_COMPLETE message?  In that situation, I am open to either way: 
1. use your suggestion: the worker's counter only starts when it completes one 
START_COMMIT -> COMMIT_COMPLETE cycle; 2. Pause the worker preemptively.  If 
three are 3 consecutive group rebalances in a short period of time and each 
coordinator cannot finish its work, the system is probably in a churning/dying 
loop, the worker is better to pause/wait rather than adding more work to 
exacerbate the situation.
   
   Regarding partial commit, the problem with the backlog building-up is even 
the partial commit cannot publish COMMIT_COMPLETE message.  In 
[Coordinator.doCommit(boolean partialCommit) 
](https://github.com/apache/iceberg/blob/main/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java#L173),
 the code for commitConsumerOffset(), commitState.clearResponse() and 
publishing COMMIT_COMPLETE event cannot be reached if one of the task writing 
metadata to Iceberg Table failed.  At the beginning of the backlog building is 
some network issues with Iceberg Catalog Server, but once we encountered the 
initial failures, the commitBuffer will never be cleared, it will be added with 
more work from each worker keep progressing and generating more DATA_WRITTEN 
messages.  Over the time the commitBuffer becomes so big and it cannot commit 
everything within COORDINATOR_EXECUTOR_KEEP_ALIVE_TIMEOUT_MS.  One possible fix 
is to clear the CommitState.commitBuffer on a p
 artial commit, but I am not sure whether that is a good idea since we will 
lose data using this approach.
   
   Regarding Stable-success threshold before resuming / 
working-being-too-eager-to-resume, during the pause period, the worker will not 
be able to consume any new Kafka messages since the Kafka consumer in KC 
framework is in pause mode.  On resumption, the worker will be able to get a 
lot more Kafka messages, so it will create load problem first on the Worker VM 
before this affects Coordinator processing.  This is kind of similar to the 
pipeline initial bootstrapping phase when the workers need to read a lot of 
Kafka messages from the beginning.  Currently iceberg-kafka-connect component 
didn't add its own flow throttling control, we are relying on Kafka consumer's 
max.poll.records (default value 500) to control how many records to be 
processed on each poll cycle.  I think if the pipeline can handle the bursting 
load during initial bootstrap stage, it will also be able to handle the load 
during pause/resumption phase.  But I am also open to add some throttling 
control in the worker, 
 but it will probably in the form of a local rate-limiter within the worker, 
rather than let the worker waiting for several more cycles since that doesn't 
relieve the pressure building up from upstream Kafka topics.
   
   Regarding adding a jitter delay when the worker restarts, yes that is good 
suggestion and RFC is updated for that.
   
   Regarding adding explicit pause duration (not co-mingle with Kafka's 
offset.flush.interval.ms), yes that is a good suggestion, RFC is updated.  So 
our Worker needs to call context.timeout(iceberg.coordinator.progress.pause-ms) 
programmatically each time before it throws RetriableException.
   
   Regarding errors.retry.timeout.ms prerequisite, I don't think that parameter 
interferes with us.  Kafka Connect's errors.retry.timeout controls 
RetryWithToleranceOperator class which affects 
WorkerSinkTask.convertAndTransformRecord() which affects 
[WorkerSinktask.convertMessages()](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L358).
  Our sinkTask is invoked on the next line: deliverMessage()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to