[ https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728266#comment-17728266 ]
Haruki Okada edited comment on KAFKA-15046 at 6/1/23 8:39 AM: -------------------------------------------------------------- Hm, when I dug into further this, I noticed there's another path that causes essentially same phenomenon. {code:java} "data-plane-kafka-request-handler-17" #169 daemon prio=5 os_prio=0 cpu=50994542.49ms elapsed=595635.65s tid=0x00007efdaebabe30 nid=0x1e707 runnable [0x00007ef9a0fdf000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native Method) at sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82) at sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461) at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:966) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:951) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:925) at org.apache.kafka.server.common.CheckpointFile.write(CheckpointFile.java:98) - locked <0x0000000680fc4930> (a java.lang.Object) at kafka.server.checkpoints.CheckpointFileWithFailureHandler.write(CheckpointFileWithFailureHandler.scala:37) at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:71) at kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:291) at kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$3(LeaderEpochFileCache.scala:263) at kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$3$adapted(LeaderEpochFileCache.scala:259) at kafka.server.epoch.LeaderEpochFileCache$$Lambda$571/0x000000080045f040.apply(Unknown Source) at scala.Option.foreach(Option.scala:437) at kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$1(LeaderEpochFileCache.scala:259) at kafka.server.epoch.LeaderEpochFileCache.truncateFromStart(LeaderEpochFileCache.scala:254) at kafka.log.UnifiedLog.$anonfun$maybeIncrementLogStartOffset$4(UnifiedLog.scala:1043) at kafka.log.UnifiedLog.$anonfun$maybeIncrementLogStartOffset$4$adapted(UnifiedLog.scala:1043) at kafka.log.UnifiedLog$$Lambda$2324/0x0000000800b59040.apply(Unknown Source) at scala.Option.foreach(Option.scala:437) at kafka.log.UnifiedLog.maybeIncrementLogStartOffset(UnifiedLog.scala:1043) - locked <0x0000000680fc5080> (a java.lang.Object) at kafka.cluster.Partition.$anonfun$deleteRecordsOnLeader$1(Partition.scala:1476) at kafka.cluster.Partition.deleteRecordsOnLeader(Partition.scala:1463) at kafka.server.ReplicaManager.$anonfun$deleteRecordsOnLocalLog$2(ReplicaManager.scala:687) at kafka.server.ReplicaManager$$Lambda$3156/0x0000000800d7c840.apply(Unknown Source) at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) at scala.collection.mutable.HashMap.map(HashMap.scala:35) at kafka.server.ReplicaManager.deleteRecordsOnLocalLog(ReplicaManager.scala:680) at kafka.server.ReplicaManager.deleteRecords(ReplicaManager.scala:875) at kafka.server.KafkaApis.handleDeleteRecordsRequest(KafkaApis.scala:2216) at kafka.server.KafkaApis.handle(KafkaApis.scala:196) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code} LeaderEpoch checkpointing also calls fsync with holding Log#lock and blocking request-handler threads to append in the meantime. This is called by scheduler thread on log-segment-breaching so might be less frequent than log roll though. Does it make sense to also making LeaderEpochCheckpointFile-flush to be outside of the lock? was (Author: ocadaruma): Hm, when I dug into further this, I noticed there's another path that causes essentially same phenomenon. {code:java} "data-plane-kafka-request-handler-17" #169 daemon prio=5 os_prio=0 cpu=50994542.49ms elapsed=595635.65s tid=0x00007efdaebabe30 nid=0x1e707 runnable [0x00007ef9a0fdf000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native Method) at sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82) at sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461) at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:966) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:951) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:925) at org.apache.kafka.server.common.CheckpointFile.write(CheckpointFile.java:98) - locked <0x0000000680fc4930> (a java.lang.Object) at kafka.server.checkpoints.CheckpointFileWithFailureHandler.write(CheckpointFileWithFailureHandler.scala:37) at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:71) at kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:291) at kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$3(LeaderEpochFileCache.scala:263) at kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$3$adapted(LeaderEpochFileCache.scala:259) at kafka.server.epoch.LeaderEpochFileCache$$Lambda$571/0x000000080045f040.apply(Unknown Source) at scala.Option.foreach(Option.scala:437) at kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromStart$1(LeaderEpochFileCache.scala:259) at kafka.server.epoch.LeaderEpochFileCache.truncateFromStart(LeaderEpochFileCache.scala:254) at kafka.log.UnifiedLog.$anonfun$maybeIncrementLogStartOffset$4(UnifiedLog.scala:1043) at kafka.log.UnifiedLog.$anonfun$maybeIncrementLogStartOffset$4$adapted(UnifiedLog.scala:1043) at kafka.log.UnifiedLog$$Lambda$2324/0x0000000800b59040.apply(Unknown Source) at scala.Option.foreach(Option.scala:437) at kafka.log.UnifiedLog.maybeIncrementLogStartOffset(UnifiedLog.scala:1043) - locked <0x0000000680fc5080> (a java.lang.Object) at kafka.cluster.Partition.$anonfun$deleteRecordsOnLeader$1(Partition.scala:1476) at kafka.cluster.Partition.deleteRecordsOnLeader(Partition.scala:1463) at kafka.server.ReplicaManager.$anonfun$deleteRecordsOnLocalLog$2(ReplicaManager.scala:687) at kafka.server.ReplicaManager$$Lambda$3156/0x0000000800d7c840.apply(Unknown Source) at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) at scala.collection.mutable.HashMap.map(HashMap.scala:35) at kafka.server.ReplicaManager.deleteRecordsOnLocalLog(ReplicaManager.scala:680) at kafka.server.ReplicaManager.deleteRecords(ReplicaManager.scala:875) at kafka.server.KafkaApis.handleDeleteRecordsRequest(KafkaApis.scala:2216) at kafka.server.KafkaApis.handle(KafkaApis.scala:196) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code} LeaderEpoch checkpointing also calls fsync with holding Log#lock and blocking request-handler threads to append in the meantime. This is called by scheduler thread on log-segment-breaching so might be less frequent than log roll though. Does it make sense to also making LeaderEpochCheckpointFile-flush to be asynchronous? > Produce performance issue under high disk load > ---------------------------------------------- > > Key: KAFKA-15046 > URL: https://issues.apache.org/jira/browse/KAFKA-15046 > Project: Kafka > Issue Type: Improvement > Components: core > Affects Versions: 3.3.2 > Reporter: Haruki Okada > Priority: Major > Labels: performance > Attachments: image-2023-06-01-12-46-30-058.png, > image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, > image-2023-06-01-12-56-19-108.png > > > * Phenomenon: > ** !image-2023-06-01-12-46-30-058.png|width=259,height=236! > ** Producer response time 99%ile got quite bad when we performed replica > reassignment on the cluster > *** RequestQueue scope was significant > ** Also request-time throttling happened at the incidental time. This caused > producers to delay sending messages in the mean time. > ** The disk I/O latency was higher than usual due to the high load for > replica reassignment. > *** !image-2023-06-01-12-56-19-108.png|width=255,height=128! > * Analysis: > ** The request-handler utilization was much higher than usual. > *** !image-2023-06-01-12-52-40-959.png|width=278,height=113! > ** Also, thread time utilization was much higher than usual on almost all > users > *** !image-2023-06-01-12-54-04-211.png|width=276,height=110! > ** From taking jstack several times, for most of them, we found that a > request-handler was doing fsync for flusing ProducerState and meanwhile other > request-handlers were waiting Log#lock for appending messages. > * > ** > *** > {code:java} > "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 > cpu=51264789.27ms elapsed=599242.76s tid=0x00007efdaeba7770 nid=0x1e704 > runnable [0x00007ef9a12e2000] > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native > Method) > at > sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82) > at > sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461) > at > kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451) > at > kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754) > at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544) > - locked <0x000000060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523) > - locked <0x000000060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:919) > - locked <0x000000060d75d820> (a java.lang.Object) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956) > at > kafka.server.ReplicaManager$$Lambda$2379/0x0000000800b7c040.apply(Unknown > Source) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666) > at kafka.server.KafkaApis.handle(KafkaApis.scala:175) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) > at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code} > * > ** Also there were bunch of logs that writing producer snapshots took > hundreds of milliseconds. > *** > {code:java} > ... > [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote > producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. > (kafka.log.ProducerStateManager) > [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] Wrote > producer snapshot at offset 247996937813 with 0 producer ids in 547 ms. > (kafka.log.ProducerStateManager) > [2023-05-01 11:08:38,887] INFO [ProducerStateManager partition=zzz-9] Wrote > producer snapshot at offset 226222355404 with 0 producer ids in 576 ms. > (kafka.log.ProducerStateManager) > ... {code} > * From the analysis, we summarized the issue as below: > * > ** 1. Disk write latency got worse due to the replica reassignment > *** We already use replication quota, and lowering the quota further may not > be acceptable for too long assignment duration > ** 2. ProducerStateManager#takeSnapshot started to take time due to fsync > latency > *** This is done at every log segment roll. > *** In our case, the broker hosts high load partitions so log roll is > occurring very frequently. > ** 3. During ProducerStateManager#takeSnapshot is doing fsync, all > subsequent produce requests to the partition is blocked due to Log#lock > ** 4. During produce requests waiting the lock, they consume request handler > threads time so it's accounted as thread and caused throttling > * Suggestion: > ** We didn't see this phenomenon when we used Kafka 2.4.1. > *** ProducerState fsync was introduced in 2.8.0 by this: > https://issues.apache.org/jira/browse/KAFKA-9892 > ** The reason why ProducerState needs to be fsync is not well described in > above ticket though, we think fsync is not really necessary here. Because: > * > ** > *** If ProducerState snapshot file was not written to the disk after power > failure, it will be just rebuilt from logs. > *** Also, even if ProducerState snapshot was corrupted after power failure, > it will be rebuilt from logs thanks to CRC -- This message was sent by Atlassian Jira (v8.20.10#820010)