jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205980757


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+    public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I dug in further and interestingly we do a different check on bumped epochs 
after markers. We do have a second check here:
   ```
               if (!(currentEntry.producerEpoch() == 
RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq) || 
currentEntry.tentativeSequence().isPresent())) {
                   throw new OutOfOrderSequenceException("Out of order sequence 
number for producer " + producerId + " at " +
                           "offset " + offset + " in partition " + 
topicPartition + ": " + appendFirstSeq +
                           " (incoming seq. number), " + currentLastSeq + " 
(current end sequence number)");
               }
   ```
   
   I've added the tentative sequence change because this PR actually changes 
this behavior on first verification. We put the producer epoch in the entry, so 
that first case no longer applies.            



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to