HenryCaiHaiying commented on issue #16389: URL: https://github.com/apache/iceberg/issues/16389#issuecomment-4569151790
@laskoviymishka Thanks for the detailed review comment. I have incorporated some of your suggestions in the updated RFC proposal (adding a section for metrics, adding explicit pause interval, adding jitter on worker resumption) but I need some more discussions/clarifications before updating more content. For [#16361](https://github.com/apache/iceberg/issues/16361), it's mainly about an inefficient processing of entries in readyBuffer, this is an in-memory processing. Although we can improve this in-memory processing from O(N^2) to O(N), it will not resolve the bottleneck of the backlog building/processing, most of the backlog processing time is spent on reading Kafka messages and writing to Iceberg Catalog Server. By the way, do you mind also taking a look at the [proposed fix ](https://github.com/apache/iceberg/pull/16453)for that bug? It is a simple one-liner fix. For the question whether we should use the signal of { pending_files, pending_partitions, last_commit_outcome } instead of the 3 stalled commit cycles, I kind of feel it is easier to understand n stalled commit cycles than tuning the number of pending_files/pending_partitions. Although pending_files/pending_partitions gives more finer control on monitor the progress of coordinator, it will become an overhead for the operator to find a good number to use as the alert threshold. An operator is probably managing/monitoring multiple iceberg ingestion pipelines with each one processing a different kafka topic. Each Kafka topic has different characteristics in terms of processing speed, number of partitions, number of files to be written per time period. Those numbers can be changing over the time or during the time of the day/week. And one pipeline might be ingesting data from multiple topic partitions, finding a good number as the threshold is not going to be easy. On the other h and, n stalled commit cycle has more intuitions to grasp. A healthy pipeline is structured/tuned to have the coordinator and workers to work along with each other in a cadence. The coordinator is supposed to be able to commit what the worker has accumulated within one commit-cycle, inability of the coordinator commit indicate mismatch of processing speed. Several cycles of failed commit indicate an action needs to be taken to relieve the pressure. Regarding initial-state grace period, I wasn't sure about the exact problem of a new working joining mid-cycle. The new worker will read the Kafka messages sequentially from the control topic, it will read a COMMIT_MESSAGE during the time it reads 3 START_COMMIT messages if that COMMIT_MESSAGE was published in between. Are you worried that coordinator switched 3 times during group rebalance and each coordinator publish a START_COMMIT message without finishing COMMIT_COMPLETE message? In that situation, I am open to either way: 1. use your suggestion: the worker's counter only starts when it completes one START_COMMIT -> COMMIT_COMPLETE cycle; 2. Pause the worker preemptively. If three are 3 consecutive group rebalances in a short period of time and each coordinator cannot finish its work, the system is probably in a churning/dying loop, the worker is better to pause/wait rather than adding more work to exacerbate the situation. Regarding partial commit, the problem with the backlog building-up is even the partial commit cannot publish COMMIT_COMPLETE message. In [Coordinator.doCommit(boolean partialCommit) ](https://github.com/apache/iceberg/blob/main/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java#L173), the code for commitConsumerOffset(), commitState.clearResponse() and publishing COMMIT_COMPLETE event cannot be reached if one of the task writing metadata to Iceberg Table failed. At the beginning of the backlog building is some network issues with Iceberg Catalog Server, but once we encountered the initial failures, the commitBuffer will never be cleared, it will be added with more work from each worker keep progressing and generating more DATA_WRITTEN messages. Over the time the commitBuffer becomes so big and it cannot commit everything within COORDINATOR_EXECUTOR_KEEP_ALIVE_TIMEOUT_MS. One possible fix is to clear the CommitState.commitBuffer on a p artial commit, but I am not sure whether that is a good idea since we will lose data using this approach. Regarding Stable-success threshold before resuming / working-being-too-eager-to-resume, during the pause period, the worker will not be able to consume any new Kafka messages since the Kafka consumer in KC framework is in pause mode. On resumption, the worker will be able to get a lot more Kafka messages, so it will create load problem first on the Worker VM before this affects Coordinator processing. This is kind of similar to the pipeline initial bootstrapping phase when the workers need to read a lot of Kafka messages from the beginning. Currently iceberg-kafka-connect component didn't add its own flow throttling control, we are relying on Kafka consumer's max.poll.records (default value 500) to control how many records to be processed on each poll cycle. I think if the pipeline can handle the bursting load during initial bootstrap stage, it will also be able to handle the load during pause/resumption phase. But I am also open to add some throttling control in the worker, but it will probably in the form of a local rate-limiter within the worker, rather than let the worker waiting for several more cycles since that doesn't relieve the pressure building up from upstream Kafka topics. Regarding adding a jitter delay when the worker restarts, yes that is good suggestion and RFC is updated for that. Regarding adding explicit pause duration (not co-mingle with Kafka's offset.flush.interval.ms), yes that is a good suggestion, RFC is updated. So our Worker needs to call context.timeout(iceberg.coordinator.progress.pause-ms) programmatically each time before it throws RetriableException. Regarding errors.retry.timeout.ms prerequisite, I don't think that parameter interferes with us. Kafka Connect's errors.retry.timeout controls RetryWithToleranceOperator class which affects WorkerSinkTask.convertAndTransformRecord() which affects [WorkerSinktask.convertMessages()](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L358). Our sinkTask is invoked on the next line: deliverMessage() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
