Ivan Yurchenko created KAFKA-19880:
--------------------------------------
Summary: First batch in producer epoch can be appended
out-of-order even for idempotent producer
Key: KAFKA-19880
URL: https://issues.apache.org/jira/browse/KAFKA-19880
Project: Kafka
Issue Type: Bug
Components: core, producer
Affects Versions: 4.1.0, 4.0.1
Reporter: Ivan Yurchenko
I observe the following behavior. A Java producer with enable.idempotence=true
and max.in.flight.requests.per.connection=5 sends a number of records, enough
to form several batches in several Produce requests. The topic is empty,
freshly created.
{code:java}
[2025-11-11 11:41:19,125] DEBUG [Producer clientId=producer-1] Assigned
producerId 0 and producerEpoch 0 to batch with base sequence 0 being sent to
partition input-topic-0
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,132] DEBUG [Producer clientId=producer-1] Sending PRODUCE
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
clientId=producer-1, correlationId=21, headerVersion=2) and timeout 30000 to
node 3:
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16383]}
(org.apache.kafka.clients.NetworkClient)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned
producerId 0 and producerEpoch 0 to batch with base sequence 1458 being sent to
partition input-topic-0
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Sending PRODUCE
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
clientId=producer-1, correlationId=22, headerVersion=2) and timeout 30000 to
node 3:
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
(org.apache.kafka.clients.NetworkClient)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned
producerId 0 and producerEpoch 0 to batch with base sequence 2823 being sent to
partition input-topic-0
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Sending PRODUCE
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
clientId=producer-1, correlationId=23, headerVersion=2) and timeout 30000 to
node 3:
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
(org.apache.kafka.clients.NetworkClient)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned
producerId 0 and producerEpoch 0 to batch with base sequence 4188 being sent to
partition input-topic-0
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Sending PRODUCE
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
clientId=producer-1, correlationId=24, headerVersion=2) and timeout 30000 to
node 3:
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
(org.apache.kafka.clients.NetworkClient)
[2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Assigned
producerId 0 and producerEpoch 0 to batch with base sequence 5553 being sent to
partition input-topic-0
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Sending PRODUCE
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13,
clientId=producer-1, correlationId=25, headerVersion=2) and timeout 30000 to
node 3:
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
(org.apache.kafka.clients.NetworkClient){code}
The cluster and the target broker are experiencing some connectivity issue
affecting metadata propagation. The target broker doesn't think it's the leader
of the partition for some time, so it rejects first requests:
{code:java}
[2025-11-11 11:41:20,456] DEBUG [KafkaApi-3] Produce request with correlation
id 21 from client producer-1 on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0
failed due to org.apache.kafka.common.errors.NotLeaderOrFollowerException
(kafka.server.KafkaApis)
[2025-11-11 11:41:24,876] DEBUG [KafkaApi-3] Produce request with correlation
id 22 from client producer-1 on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0
failed due to org.apache.kafka.common.errors.NotLeaderOrFollowerException
(kafka.server.KafkaApis)
{code}
Then it finds out it's the leader of the partition, but doesn't yet learn about
the other replica:
{code:java}
[2025-11-11 11:41:45,498] ERROR [ReplicaManager broker=3] Error processing
append operation on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the
current ISR : 1 is insufficient to satisfy the min.isr requirement of 2 for
partition input-topic-0, live replica(s) broker.id are : Set(3)
[2025-11-11 11:41:45,498] DEBUG [KafkaApi-3] Produce request with correlation
id 23 from client producer-1 on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0
failed due to org.apache.kafka.common.errors.NotEnoughReplicasException
(kafka.server.KafkaApis){code}
And finally when it comes to the request with correlation ID 24, it succeeds.
So the first written batch is logically out-of-sequence, but accepted.
The previously failed batches may be retried later, with new producer epoch
(like happened in my case), but this doesn't matter: the intended order of
records is violated.
I boiled this down to the following test (in ReplicaManagerTest):
{noformat}
@Test
def testX(): Unit = {
val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new
File(_)))
val node = new Node(0, "host0", 0)
mockGetAliveBrokerFunctions(metadataCache, Seq(node))
val rm = new ReplicaManager(
metrics = metrics,
config = config,
time = time,
scheduler = new MockScheduler(time),
logManager = mockLogMgr,
quotaManagers = quotaManager,
metadataCache = new KRaftMetadataCache(config.brokerId, () =>
KRaftVersion.KRAFT_VERSION_0),
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
alterPartitionManager = alterPartitionManager,
threadNamePrefix = Option(this.getClass.getName))
try {
val tp = new TopicPartition(topic, 0)
val partition = rm.createPartition(tp)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(0, 0,
brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(config.brokerId())
.setLeaderEpoch(0)
.setIsr(Seq[Integer](config.brokerId()).asJava)
.setPartitionEpoch(0)
.setReplicas(Seq[Integer](config.brokerId()).asJava)
.setIsNew(false)).asJava,
Collections.singletonMap(topic, topicId),
Set(node).asJava).build(), (_, _) => ())
rm.getPartitionOrException(tp)
.localLogOrException
val topicIdPartition = new TopicIdPartition(Uuid.ZERO_UUID,
partition.topicPartition)
val producerId = 10L
val producerEpoch: Short = 10
val r0 = MemoryRecords.withIdempotentRecords(Compression.NONE,
producerId, producerEpoch, 0, new SimpleRecord("record 0".getBytes()))
val r1 = MemoryRecords.withIdempotentRecords(Compression.NONE,
producerId, producerEpoch, 1, new SimpleRecord("record 1".getBytes()))
// val order = List(r0, r1) // normal order -- no ordering violation
val order = List(r1, r0) // r1 will be accepted out of order; r0 will be
retried later with newer epoch and accepted
def callback(responseStatus: Map[TopicIdPartition, PartitionResponse]):
Unit = {
println(responseStatus(topicIdPartition).error)
}
for (r <- order) {
rm.appendRecords(
timeout = 1000,
requiredAcks = -1,
internalTopicsAllowed = false,
origin = AppendOrigin.CLIENT,
entriesPerPartition = Map(topicIdPartition -> r),
responseCallback = callback,
)
}
} finally {
rm.shutdown(checkpointHW = false)
}
}
{noformat}
As far as I understand, it isn't necessary for the partition to be empty. It's
enough for the producer state to be empty and the first requests in the
producer epoch to fail like above.
If a first request in the producer epoch succeeds, the situation will not be
possible. For example, we send r1, r2, r3. r1 succeeds, r2 fails with
NotLeaderOrFollowerException, r3 won't be accepted and will fail with
OutOfOrderSequenceException.
I attribute this difference to [this piece of validation on the broker
side|[https://github.com/apache/kafka/blob/3479ce793bafc6a1c42e6afa77e3fbfc3a36c80c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java#L142-L148].]
AFAIU, this is a bug, at least according to how I read the documentation. I
would expect enable.idempotence to prevent this from happening. Am I right or
it is acceptable and we should improve the documentation instead?
Can we require on the broker side that the first batch in a producer epoch
always starts with the sequence 0?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)