jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205792946


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+          if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) 
!= ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an 
ongoing transaction")
+            } else if (maybeLastEntry.isPresent && 
maybeLastEntry.get.tentativeSequence.isPresent && 
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   Did we want to move the verification check there? Or just tentative sequence?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to