jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205794685
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state. + // Also check that we are not appending a record with a higher sequence than one previously seen through verification. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) { + if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) { + throw new InvalidRecordException("Record was not part of an ongoing transaction") + } else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence) Review Comment: One thing that is tricky is that the producer state entry used in the sequence check is actually not the -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org