[ https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucian Ilie updated KAFKA-15572: -------------------------------- Description: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, the operation fails apparently randomly with NoSuchFileException. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the reason for this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker 201 with same disks * broker 301 with same disks When Cruise Control executes a remove disk operation, it calls Kafka "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to /new-kafka-logs1/kafka. During the alter log dir operation, future logs are created (to move data from e.g. "/kafka-logs1/kafka/topic-partition" to "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and finally the log dir will be renamed from "/new-kafka-logs1/kafka/topic-partition.hash-future" to "/new-kafka-logs1/kafka/topic-partition". This operation is started in [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] and is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The rename is then delegated to [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. This is the 1st code that is part of the race condition. Meanwhile, log dirs can be rolled based on known conditions (e.g. getting full), which will call [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], which is locked using the UnifiedLog lock. However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is done as a scheduled task in a separate thread. This means that further operations are [not locked at UnifiedLog level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The operation is further delegated to [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], which will also try to [flush the log dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. Since the log dir flush does not share the lock with the rename dir operation, the rename dir operation might succeed in moving the log dir on disk to "topic-partition", but the LocalLog._dir will remain set to "topic-partition.hash-future", and when the flush will attempt to flush the "topic-partition.hash-future" directory, it will throw NoSuchFileException: "topic-partition.hash-future". Basically, [this line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] might succeed, and before [this other line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] is executed, flush tries to [flush the future dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the issue by synchronizing the flush dir operation. Will reply with a link to a PR. Note that this bug replicates for every version since 3.0.0, caused by [this commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] when flush dir was added. was: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, the operation fails apparently randomly with NoSuchFileException. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the reason for this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker 201 with same disks * broker 301 with same disks When Cruise Control executes a remove disk operation, it calls Kafka "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to /new-kafka-logs1/kafka. During the alter log dir operation, future logs are created, data is moved and finally the log dir will be renamed from "topic-partition.hash-future" to "topic-partition". This operation is started in [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] and is locked using the UnifiedLog lock. The rename is then delegated to [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. This is the 1st code that is part of the race condition. Meanwhile, log dirs can be rolled based on known conditions (e.g. getting full), which will call [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], which is locked using the UnifiedLog lock. However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is done as a scheduled task in a separate thread. This means that further operations are not locked at UnifiedLog level. The operation is further delegated to [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], which will also try to [flush the log dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. Since the log dir flush does not share the lock with the rename dir operation, the rename dir operation might succeed in moving the log dir on disk to "topic-partition", but the LocalLog._dir will remain set to "topic-partition.hash-future", and when the flush will attempt to flush the "topic-partition.hash-future" directory, it will throw NoSuchFileException: "topic-partition.hash-future". We tested a fix for this on our clusters and will reply with a PR. Note that this bug replicates for every version since 3.0.0, caused by [this commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] when flush dir was added. > Race condition between future log dir roll and replace current with future > log during alterReplicaLogDirs > --------------------------------------------------------------------------------------------------------- > > Key: KAFKA-15572 > URL: https://issues.apache.org/jira/browse/KAFKA-15572 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 3.3.1, 3.4.1 > Reporter: Lucian Ilie > Priority: Major > Original Estimate: 48h > Remaining Estimate: 48h > > We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, > using banzaicloud/koperator. > We have multiple disks per broker. > We are using Cruise Control remove disk operation in order to aggregate > multiple smaller disks into a single bigger disk. > When we do this, the operation fails apparently randomly with > NoSuchFileException. Attached a sample of logs for the exception and the > previous operations taking place. > Will further detail the reason for this issue. > Say we have 3 brokers: > * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger > disk /new-kafka-logs1/kafka > * broker 201 with same disks > * broker 301 with same disks > When Cruise Control executes a remove disk operation, it calls Kafka > "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment > as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to > /new-kafka-logs1/kafka. > During the alter log dir operation, future logs are created (to move data > from e.g. "/kafka-logs1/kafka/topic-partition" to > "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and > finally the log dir will be renamed from > "/new-kafka-logs1/kafka/topic-partition.hash-future" to > "/new-kafka-logs1/kafka/topic-partition". This operation is started in > [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] > and is locked using the [UnifiedLog > lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > The rename is then delegated to > [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. > This is the 1st code that is part of the race condition. > Meanwhile, log dirs can be rolled based on known conditions (e.g. getting > full), which will call > [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], > which is locked using the UnifiedLog lock. However, the further delegation > to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is > done as a scheduled task in a separate thread. This means that further > operations are [not locked at UnifiedLog > level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > The operation is further delegated to > [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], > which will also try to [flush the log > dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. > > Since the log dir flush does not share the lock with the rename dir > operation, the rename dir operation might succeed in moving the log dir on > disk to "topic-partition", but the LocalLog._dir will remain set to > "topic-partition.hash-future", and when the flush will attempt to flush the > "topic-partition.hash-future" directory, it will throw NoSuchFileException: > "topic-partition.hash-future". Basically, [this > line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] > might succeed, and before [this other > line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] > is executed, flush tries to [flush the future > dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. > We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved > the issue by synchronizing the flush dir operation. Will reply with a link to > a PR. > Note that this bug replicates for every version since 3.0.0, caused by [this > commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] > when flush dir was added. -- This message was sent by Atlassian Jira (v8.20.10#820010)