jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205980757
########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ########## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } + public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { + if (this.producerEpoch < producerEpoch) { + batchMetadata.clear(); + this.producerEpoch = producerEpoch; + return true; + } else { + return false; + } + } + + // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions + // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: + // a) There is no tentative sequence yet + // b) A lower sequence for the same epoch is seen and should thereby block records after that + // c) A higher producer epoch is found that will reset the lowest seen sequence + public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { + if (batchMetadata.isEmpty() && Review Comment: I dug in further and interestingly we do a different check on bumped epochs after markers. We do have a second check here: ``` if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq) || currentEntry.tentativeSequence().isPresent())) { throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); } ``` I've added the tentative sequence change because this PR actually changes this behavior on first verification. We put the producer epoch in the entry, so that first case no longer applies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org