jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205794685


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+          if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) 
!= ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an 
ongoing transaction")
+            } else if (maybeLastEntry.isPresent && 
maybeLastEntry.get.tentativeSequence.isPresent && 
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   One thing that is tricky is that the producer state entry used in the 
sequence check is actually not the 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to