This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 77230b567ab51726302466058ca5f5e734e81664 Author: Jason Gustafson <[email protected]> AuthorDate: Sun Jul 10 10:16:39 2022 -0700 KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390) When cleaning a topic with transactional data, if the keys used in the user data happen to conflict with the keys in the transaction markers, it is possible for the markers to get removed before the corresponding data from the transaction is removed. This results in a hanging transaction or the loss of the transaction's atomicity since it would effectively get bundled into the next transaction in the log. Currently control records are excluded when building the offset map, but not whe [...] Reviewers: Jun Rao <[email protected]> --- core/src/main/scala/kafka/log/LogCleaner.scala | 2 + .../test/scala/unit/kafka/log/LogCleanerTest.scala | 79 ++++++++++++++++++---- 2 files changed, 69 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 12099f0f2c1..97de1db57c2 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -678,6 +678,8 @@ private[log] class Cleaner(val id: Int, if (discardBatchRecords) // The batch is only retained to preserve producer sequence information; the records can be removed false + else if (batch.isControlBatch) + true else Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats) } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 5b942342193..253bf5490c5 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1027,6 +1027,50 @@ class LogCleanerTest { assertEquals(List(3, 4, 5), offsetsInLog(log)) } + + @Test + def testCleaningWithKeysConflictingWithTxnMarkerKeys(): Unit = { + val cleaner = makeCleaner(10) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + val leaderEpoch = 5 + val producerEpoch = 0.toShort + + // First we append one committed transaction + val producerId1 = 1L + val appendProducer = appendTransactionalAsLeader(log, producerId1, producerEpoch, leaderEpoch) + appendProducer(Seq(1)) + log.appendAsLeader(commitMarker(producerId1, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator) + + // Now we append one transaction with a key which conflicts with the COMMIT marker appended above + def commitRecordKey(): ByteBuffer = { + val keySize = ControlRecordType.COMMIT.recordKey().sizeOf() + val key = ByteBuffer.allocate(keySize) + ControlRecordType.COMMIT.recordKey().writeTo(key) + key.flip() + key + } + + val producerId2 = 2L + val records = MemoryRecords.withTransactionalRecords( + CompressionType.NONE, + producerId2, + producerEpoch, + 0, + new SimpleRecord(time.milliseconds(), commitRecordKey(), ByteBuffer.wrap("foo".getBytes)) + ) + log.appendAsLeader(records, leaderEpoch, origin = AppendOrigin.Client) + log.appendAsLeader(commitMarker(producerId2, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator) + log.roll() + assertEquals(List(0, 1, 2, 3), offsetsInLog(log)) + + // After cleaning, the marker should not be removed + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset)) + assertEquals(List(0, 1, 2, 3), lastOffsetsPerBatchInLog(log)) + assertEquals(List(0, 1, 2, 3), offsetsInLog(log)) + } + @Test def testPartialSegmentClean(): Unit = { // because loadFactor is 0.75, this means we can fit 1 message in the map @@ -1917,20 +1961,31 @@ class LogCleanerTest { partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes)) } - private def appendTransactionalAsLeader(log: Log, - producerId: Long, - producerEpoch: Short, - leaderEpoch: Int = 0, - origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = { - appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, origin = origin) + private def appendTransactionalAsLeader( + log: Log, + producerId: Long, + producerEpoch: Short, + leaderEpoch: Int = 0, + origin: AppendOrigin = AppendOrigin.Client + ): Seq[Int] => LogAppendInfo = { + appendIdempotentAsLeader( + log, + producerId, + producerEpoch, + isTransactional = true, + leaderEpoch = leaderEpoch, + origin = origin + ) } - private def appendIdempotentAsLeader(log: Log, - producerId: Long, - producerEpoch: Short, - isTransactional: Boolean = false, - leaderEpoch: Int = 0, - origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = { + private def appendIdempotentAsLeader( + log: Log, + producerId: Long, + producerEpoch: Short, + isTransactional: Boolean = false, + leaderEpoch: Int = 0, + origin: AppendOrigin = AppendOrigin.Client + ): Seq[Int] => LogAppendInfo = { var sequence = 0 keys: Seq[Int] => { val simpleRecords = keys.map { key =>
