jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1205792946
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, if (duplicateBatch.isPresent) { return (updatedProducers, completedTxns.toList, Some(duplicateBatch.get())) } + + // Verify that if the record is transactional & the append origin is client, that we are in VERIFIED state. + // Also check that we are not appending a record with a higher sequence than one previously seen through verification. + if (batch.isTransactional && producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()) { + if (verificationState(batch.producerId(), batch.producerEpoch()) != ProducerStateEntry.VerificationState.VERIFIED) { + throw new InvalidRecordException("Record was not part of an ongoing transaction") + } else if (maybeLastEntry.isPresent && maybeLastEntry.get.tentativeSequence.isPresent && maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence) Review Comment: Did we want to move the verification check there? Or just tentative sequence? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org