Haruki Okada created KAFKA-19407:
------------------------------------

             Summary: OffsetsOutOfOrderException on followers due to the race 
condition in the leader
                 Key: KAFKA-19407
                 URL: https://issues.apache.org/jira/browse/KAFKA-19407
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 3.3.2
            Reporter: Haruki Okada
            Assignee: Haruki Okada
         Attachments: image-2025-06-13-23-01-40-371.png

h2. Environment
 * Kafka version: 3.3.2 (But we suppose this issue still exists in latest Kafka)
 * Replication factor: 3

h2. Phenomenon

We experienced a partition in our cluster got UnderMinISR suddenly without any 
hardware/network issue or any operation.
{code:java}
[2025-06-10 20:27:14,310] INFO [Partition topic-X-49 broker=17] Shrinking ISR 
from 15,16,17 to 17. Leader: (highWatermark: 572579089, endOffset: 572579215). 
Out of sync replicas: (brokerId: 15, endOffset: 572579089) (brokerId: 16, 
endOffset: 572579089). (kafka.cluster.Partition)
{code}
On both followers, we saw below log:
{code:java}
[2025-06-10 20:26:59,804] ERROR [ReplicaFetcher replicaId=16, leaderId=17, 
fetcherId=1] Unexpected error occurred while processing data for partition 
topic-X-49 at offset 572579089 (kafka.server.ReplicaFetcherThread)
kafka.common.OffsetsOutOfOrderException: Out of order offsets found in append 
to topic-X-49: ArrayBuffer(572579089, 572579090, 572579091, 572579092, 
572579093, 572579094, 572579095, 572579096, 572579097, 572579098, 572579099, 
572579100, 572579101, 572579102, 572579103, 572579104, 572579105, 572579106, 
572579107, 572579108, 572579109, 572579089,...
{code}
The log tells there's an offset regression after 572579109 (to 572579089).
h2. Analysis
h3. The cause of the offset regression

We dumped active log segment on the leader and confirmed that the leader's log 
actually contains records where non-monotonic offsets assigned.
So the problem must exists on the leader rather than the follower.

On the leader, we found below log is output right before the follower 
experiences OffsetsOutOfOrderException.
{code:java}
[2025-06-10 20:26:37,463] ERROR [ReplicaManager broker=17] Error processing 
append operation on partition topic-X-49 (kafka.server.ReplicaManager)
java.lang.IllegalStateException: Attempt to append a timestamp (1749554796970) 
to slot 1 no larger than the last timestamp appended (1749554797191) to 
/path/to/kafka-log-dir/topic-X-49/00000000000572579060.timeindex.
        at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:128)
        at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114)
        at kafka.log.LogSegment.append(LogSegment.scala:167)
        at kafka.log.LocalLog.append(LocalLog.scala:442)
        at kafka.log.UnifiedLog.append(UnifiedLog.scala:950)
        at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
        at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
        at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
        at 
scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
        at 
scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
        at scala.collection.mutable.HashMap.map(HashMap.scala:35)
        at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
        at java.base/java.lang.Thread.run(Thread.java:829)
{code}
This explains the offset regression as below:
 * Due to the IllegalStateException in the middle of handling produce request, 
LogEndOffset wasn't updated while records were already written to the log 
segment.
 * When the leader handles subsequent produce requests, it assigns same offset 
to the record batch
 ** => Non-monotonic offset sequence happens and it causes 
OffsetsOutOfOrderException on followers

h3. The cause of IllegalStateException

The exception tells that trying to append timestamp 1749554796970 which is 
smaller than the last entry in the time index (1749554797191), that is invalid.

The timestamp about to be appended here is 
[LogSegment#maxTimestampSoFar|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/log/LogSegment.scala#L355].
 It means that maxTimestampSoFar regressed somehow.

After reading the code, we found that below race-condition is possible and it 
explains the phenomenon we experienced:


!image-2025-06-13-23-01-40-371.png|width=597,height=471!

Actually we confirmed the log-segment was rolled out right before the 
phenomenon (i.e. entering [if block at 
LogSegment#L106|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/log/LogSegment.scala#L106]
 is possible) and there were clients calling listOffsets API on the partition, 
which matches to the scenario.

h2. Suggested Fix

TBD.
This is typical check-then-act race and there are several possible ways to fix 
the issue, but still not sure which way is the best.
Anyways, we plan to send patches later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to