[ 
https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucian Ilie updated KAFKA-15572:
--------------------------------
    Description: 
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, the operation fails apparently randomly with 
NoSuchFileException. Attached a sample of logs for the exception and the 
previous operations taking place.

Will further detail the reason for this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 201 with same disks
 * broker 301 with same disks

When Cruise Control executes a remove disk operation, it calls Kafka 
"adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as 
to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
/new-kafka-logs1/kafka.

During the alter log dir operation, future logs are created (to move data from 
e.g. "/kafka-logs1/kafka/topic-partition" to 
"/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
finally the log dir will be renamed from 
"/new-kafka-logs1/kafka/topic-partition.hash-future" to 
"/new-kafka-logs1/kafka/topic-partition". This operation is started in 
[UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
 and is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The rename is then delegated to 
[LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
 This is the 1st code that is part of the race condition.

Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
full), which will call 
[UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
 which is locked using the UnifiedLog lock. However, the further delegation to 
UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is done 
as a scheduled task in a separate thread. This means that further operations 
are [not locked at UnifiedLog 
level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The operation is further delegated to 
[LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
 which will also try to [flush the log 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
 

Since the log dir flush does not share the lock with the rename dir operation, 
the rename dir operation might succeed in moving the log dir on disk to 
"topic-partition", but the LocalLog._dir will remain set to 
"topic-partition.hash-future", and when the flush will attempt to flush the 
"topic-partition.hash-future" directory, it will throw NoSuchFileException: 
"topic-partition.hash-future". Basically, [this 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 might succeed, and before [this other 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 is executed, flush tries to [flush the future 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].

We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the 
issue by synchronizing the flush dir operation. Will reply with a link to a PR.

Note that this bug replicates for every version since 3.0.0, caused by [this 
commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
 when flush dir was added.

  was:
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, the operation fails apparently randomly with 
NoSuchFileException. Attached a sample of logs for the exception and the 
previous operations taking place.

Will further detail the reason for this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 201 with same disks
 * broker 301 with same disks

When Cruise Control executes a remove disk operation, it calls Kafka 
"adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as 
to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
/new-kafka-logs1/kafka.

During the alter log dir operation, future logs are created, data is moved and 
finally the log dir will be renamed from "topic-partition.hash-future" to 
"topic-partition". This operation is started in 
[UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
 and is locked using the UnifiedLog lock. The rename is then delegated to 
[LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
 This is the 1st code that is part of the race condition.

Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
full), which will call 
[UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
 which is locked using the UnifiedLog lock. However, the further delegation to 
UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is done 
as a scheduled task in a separate thread. This means that further operations 
are not locked at UnifiedLog level. The operation is further delegated to 
[LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
 which will also try to [flush the log 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
 

Since the log dir flush does not share the lock with the rename dir operation, 
the rename dir operation might succeed in moving the log dir on disk to 
"topic-partition", but the LocalLog._dir will remain set to 
"topic-partition.hash-future", and when the flush will attempt to flush the 
"topic-partition.hash-future" directory, it will throw NoSuchFileException: 
"topic-partition.hash-future".

We tested a fix for this on our clusters and will reply with a PR.

Note that this bug replicates for every version since 3.0.0, caused by [this 
commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
 when flush dir was added.


> Race condition between future log dir roll and replace current with future 
> log during alterReplicaLogDirs
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15572
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15572
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.3.1, 3.4.1
>            Reporter: Lucian Ilie
>            Priority: Major
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
> using banzaicloud/koperator.
> We have multiple disks per broker.
> We are using Cruise Control remove disk operation in order to aggregate 
> multiple smaller disks into a single bigger disk.
> When we do this, the operation fails apparently randomly with 
> NoSuchFileException. Attached a sample of logs for the exception and the 
> previous operations taking place.
> Will further detail the reason for this issue.
> Say we have 3 brokers:
>  * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
> disk /new-kafka-logs1/kafka
>  * broker 201 with same disks
>  * broker 301 with same disks
> When Cruise Control executes a remove disk operation, it calls Kafka 
> "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment 
> as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
> /new-kafka-logs1/kafka.
> During the alter log dir operation, future logs are created (to move data 
> from e.g. "/kafka-logs1/kafka/topic-partition" to 
> "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
> finally the log dir will be renamed from 
> "/new-kafka-logs1/kafka/topic-partition.hash-future" to 
> "/new-kafka-logs1/kafka/topic-partition". This operation is started in 
> [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
>  and is locked using the [UnifiedLog 
> lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  The rename is then delegated to 
> [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
>  This is the 1st code that is part of the race condition.
> Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
> full), which will call 
> [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
>  which is locked using the UnifiedLog lock. However, the further delegation 
> to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is 
> done as a scheduled task in a separate thread. This means that further 
> operations are [not locked at UnifiedLog 
> level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  The operation is further delegated to 
> [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
>  which will also try to [flush the log 
> dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
>  
> Since the log dir flush does not share the lock with the rename dir 
> operation, the rename dir operation might succeed in moving the log dir on 
> disk to "topic-partition", but the LocalLog._dir will remain set to 
> "topic-partition.hash-future", and when the flush will attempt to flush the 
> "topic-partition.hash-future" directory, it will throw NoSuchFileException: 
> "topic-partition.hash-future". Basically, [this 
> line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
>  might succeed, and before [this other 
> line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
>  is executed, flush tries to [flush the future 
> dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
> We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved 
> the issue by synchronizing the flush dir operation. Will reply with a link to 
> a PR.
> Note that this bug replicates for every version since 3.0.0, caused by [this 
> commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
>  when flush dir was added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to