Ivan Yurchenko created KAFKA-19880:
--------------------------------------

             Summary: First batch in producer epoch can be appended 
out-of-order even for idempotent producer
                 Key: KAFKA-19880
                 URL: https://issues.apache.org/jira/browse/KAFKA-19880
             Project: Kafka
          Issue Type: Bug
          Components: core, producer 
    Affects Versions: 4.1.0, 4.0.1
            Reporter: Ivan Yurchenko


I observe the following behavior. A Java producer with enable.idempotence=true 
and max.in.flight.requests.per.connection=5 sends a number of records, enough 
to form several batches in several Produce requests. The topic is empty, 
freshly created.

 
{code:java}
[2025-11-11 11:41:19,125] DEBUG [Producer clientId=producer-1] Assigned 
producerId 0 and producerEpoch 0 to batch with base sequence 0 being sent to 
partition input-topic-0 
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,132] DEBUG [Producer clientId=producer-1] Sending PRODUCE 
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
clientId=producer-1, correlationId=21, headerVersion=2) and timeout 30000 to 
node 3: 
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16383]}
 (org.apache.kafka.clients.NetworkClient)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned 
producerId 0 and producerEpoch 0 to batch with base sequence 1458 being sent to 
partition input-topic-0 
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Sending PRODUCE 
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
clientId=producer-1, correlationId=22, headerVersion=2) and timeout 30000 to 
node 3: 
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
 (org.apache.kafka.clients.NetworkClient)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned 
producerId 0 and producerEpoch 0 to batch with base sequence 2823 being sent to 
partition input-topic-0 
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Sending PRODUCE 
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
clientId=producer-1, correlationId=23, headerVersion=2) and timeout 30000 to 
node 3: 
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
 (org.apache.kafka.clients.NetworkClient)
[2025-11-11 11:41:19,133] DEBUG [Producer clientId=producer-1] Assigned 
producerId 0 and producerEpoch 0 to batch with base sequence 4188 being sent to 
partition input-topic-0 
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Sending PRODUCE 
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
clientId=producer-1, correlationId=24, headerVersion=2) and timeout 30000 to 
node 3: 
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
 (org.apache.kafka.clients.NetworkClient)
[2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Assigned 
producerId 0 and producerEpoch 0 to batch with base sequence 5553 being sent to 
partition input-topic-0 
(org.apache.kafka.clients.producer.internals.RecordAccumulator)
[2025-11-11 11:41:19,135] DEBUG [Producer clientId=producer-1] Sending PRODUCE 
request with header RequestHeader(apiKey=PRODUCE, apiVersion=13, 
clientId=producer-1, correlationId=25, headerVersion=2) and timeout 30000 to 
node 3: 
{acks=-1,timeout=30000,partitionSizes=[4rrMdxpZQnyDKO4QKjPq-A:input-topic-0=16377]}
 (org.apache.kafka.clients.NetworkClient){code}
 

The cluster and the target broker are experiencing some connectivity issue 
affecting metadata propagation. The target broker doesn't think it's the leader 
of the partition for some time, so it rejects first requests:

 
{code:java}
[2025-11-11 11:41:20,456] DEBUG [KafkaApi-3] Produce request with correlation 
id 21 from client producer-1 on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 
failed due to org.apache.kafka.common.errors.NotLeaderOrFollowerException 
(kafka.server.KafkaApis)
[2025-11-11 11:41:24,876] DEBUG [KafkaApi-3] Produce request with correlation 
id 22 from client producer-1 on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 
failed due to org.apache.kafka.common.errors.NotLeaderOrFollowerException 
(kafka.server.KafkaApis)
{code}
Then it finds out it's the leader of the partition, but doesn't yet learn about 
the other replica:

 
{code:java}
[2025-11-11 11:41:45,498] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
current ISR : 1 is insufficient to satisfy the min.isr requirement of 2 for 
partition input-topic-0, live replica(s) broker.id are : Set(3)
[2025-11-11 11:41:45,498] DEBUG [KafkaApi-3] Produce request with correlation 
id 23 from client producer-1 on partition 4rrMdxpZQnyDKO4QKjPq-A:input-topic-0 
failed due to org.apache.kafka.common.errors.NotEnoughReplicasException 
(kafka.server.KafkaApis){code}
And finally when it comes to the request with correlation ID 24, it succeeds. 
So the first written batch is logically out-of-sequence, but accepted.

 

The previously failed batches may be retried later, with new producer epoch 
(like happened in my case), but this doesn't matter: the intended order of 
records is violated.

I boiled this down to the following test (in ReplicaManagerTest):
{noformat}
  @Test
  def testX(): Unit = {
    val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new 
File(_)))
    val node = new Node(0, "host0", 0)
    mockGetAliveBrokerFunctions(metadataCache, Seq(node))
    val rm = new ReplicaManager(
      metrics = metrics,
      config = config,
      time = time,
      scheduler = new MockScheduler(time),
      logManager = mockLogMgr,
      quotaManagers = quotaManager,
      metadataCache = new KRaftMetadataCache(config.brokerId, () => 
KRaftVersion.KRAFT_VERSION_0),
      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
      alterPartitionManager = alterPartitionManager,
      threadNamePrefix = Option(this.getClass.getName))
    try {
      val tp = new TopicPartition(topic, 0)
      val partition = rm.createPartition(tp)
      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
      rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
        Seq(new LeaderAndIsrRequest.PartitionState()
          .setTopicName(topic)
          .setPartitionIndex(0)
          .setControllerEpoch(0)
          .setLeader(config.brokerId())
          .setLeaderEpoch(0)
          .setIsr(Seq[Integer](config.brokerId()).asJava)
          .setPartitionEpoch(0)
          .setReplicas(Seq[Integer](config.brokerId()).asJava)
          .setIsNew(false)).asJava,
        Collections.singletonMap(topic, topicId),
        Set(node).asJava).build(), (_, _) => ())
      rm.getPartitionOrException(tp)
        .localLogOrException

      val topicIdPartition = new TopicIdPartition(Uuid.ZERO_UUID, 
partition.topicPartition)

      val producerId = 10L
      val producerEpoch: Short = 10

      val r0 = MemoryRecords.withIdempotentRecords(Compression.NONE, 
producerId, producerEpoch, 0, new SimpleRecord("record 0".getBytes()))
      val r1 = MemoryRecords.withIdempotentRecords(Compression.NONE, 
producerId, producerEpoch, 1, new SimpleRecord("record 1".getBytes()))

//      val order = List(r0, r1)  // normal order -- no ordering violation
      val order = List(r1, r0)  // r1 will be accepted out of order; r0 will be 
retried later with newer epoch and accepted

      def callback(responseStatus: Map[TopicIdPartition, PartitionResponse]): 
Unit = {
        println(responseStatus(topicIdPartition).error)
      }

      for (r <- order) {
        rm.appendRecords(
          timeout = 1000,
          requiredAcks = -1,
          internalTopicsAllowed = false,
          origin = AppendOrigin.CLIENT,
          entriesPerPartition = Map(topicIdPartition -> r),
          responseCallback = callback,
        )
      }
    } finally {
      rm.shutdown(checkpointHW = false)
    }
  }
{noformat}
 

As far as I understand, it isn't necessary for the partition to be empty. It's 
enough for the producer state to be empty and the first requests in the 
producer epoch to fail like above.

If a first request in the producer epoch succeeds, the situation will not be 
possible. For example, we send r1, r2, r3. r1 succeeds, r2 fails with 
NotLeaderOrFollowerException, r3 won't be accepted and will fail with 
OutOfOrderSequenceException.

I attribute this difference to [this piece of validation on the broker 
side|[https://github.com/apache/kafka/blob/3479ce793bafc6a1c42e6afa77e3fbfc3a36c80c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java#L142-L148].]

 

AFAIU, this is a bug, at least according to how I read the documentation. I 
would expect enable.idempotence to prevent this from happening. Am I right or 
it is acceptable and we should improve the documentation instead?

Can we require on the broker side that the first batch in a producer epoch 
always starts with the sequence 0?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to