Repository: kafka Updated Branches: refs/heads/0.11.0 9ae4deaa5 -> 33216846f
MINOR: Broker should disallow downconversion of transactional/idempotent records Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3118 from hachikuji/disallow-transactional-idempotent-downconversion (cherry picked from commit e3e2f1d22d17a20ccccf67218c600e7e1647e1ca) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/33216846 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/33216846 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/33216846 Branch: refs/heads/0.11.0 Commit: 33216846fe070dfb5dd09eab9f53490bae92c357 Parents: 9ae4dea Author: Jason Gustafson <[email protected]> Authored: Mon May 22 20:00:07 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Mon May 22 20:01:44 2017 -0700 ---------------------------------------------------------------------- .../kafka/common/record/MemoryRecords.java | 6 +++ .../kafka/common/requests/ProduceResponse.java | 1 + .../src/main/scala/kafka/log/LogValidator.scala | 40 ++++++++++-------- .../scala/unit/kafka/log/LogValidatorTest.scala | 44 +++++++++++++++++++- 4 files changed, 72 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/33216846/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index cec309e..7391e7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -443,6 +443,12 @@ public class MemoryRecords extends AbstractRecords { RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, false, records); } + public static MemoryRecords withIdempotentRecords(CompressionType compressionType, long producerId, + short producerEpoch, int baseSequence, SimpleRecord... records) { + return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch, + baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, false, records); + } + public static MemoryRecords withIdempotentRecords(byte magic, long initialOffset, CompressionType compressionType, long producerId, short producerEpoch, int baseSequence, int partitionLeaderEpoch, SimpleRecord... records) { http://git-wip-us.apache.org/repos/asf/kafka/blob/33216846/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 06c1f6e..42ae434 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -60,6 +60,7 @@ public class ProduceResponse extends AbstractResponse { * NOT_ENOUGH_REPLICAS_AFTER_APPEND (20) * INVALID_REQUIRED_ACKS (21) * TOPIC_AUTHORIZATION_FAILED (29) + * UNSUPPORTED_FOR_MESSAGE_FORMAT (43) */ private static final String BASE_OFFSET_KEY_NAME = "base_offset"; http://git-wip-us.apache.org/repos/asf/kafka/blob/33216846/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 33257fd..ee5cb58 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import kafka.common.LongRef import kafka.message.{CompressionCodec, NoCompressionCodec} import kafka.utils.Logging -import org.apache.kafka.common.errors.InvalidTimestampException +import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import scala.collection.mutable @@ -62,14 +62,14 @@ private[kafka] object LogValidator extends Logging { else // Do in-place validation, offset assignment and maybe set timestamp assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs, - partitionLeaderEpoch, isFromClient) + partitionLeaderEpoch, isFromClient, magic) } else { validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic, magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient) } } - private def validateBatch(batch: RecordBatch, isFromClient: Boolean): Unit = { + private def validateBatch(batch: RecordBatch, isFromClient: Boolean, toMagic: Byte): Unit = { if (isFromClient) { if (batch.hasProducerId && batch.baseSequence < 0) throw new InvalidRecordException(s"Invalid sequence number ${batch.baseSequence} in record batch " + @@ -78,6 +78,12 @@ private[kafka] object LogValidator extends Logging { if (batch.isControlBatch) throw new InvalidRecordException("Clients are not allowed to write control records") } + + if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2) + throw new UnsupportedForMessageFormatException(s"Transactional records cannot be used with magic version $toMagic") + + if (batch.hasProducerId && toMagic < RecordBatch.MAGIC_VALUE_V2) + throw new UnsupportedForMessageFormatException(s"Idempotent records cannot be used with magic version $toMagic") } private def validateRecord(batch: RecordBatch, record: Record, now: Long, timestampType: TimestampType, @@ -118,7 +124,7 @@ private[kafka] object LogValidator extends Logging { offsetCounter.value, now, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch) for (batch <- records.batches.asScala) { - validateBatch(batch, isFromClient) + validateBatch(batch, isFromClient, toMagicValue) for (record <- batch.asScala) { validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic) @@ -142,14 +148,14 @@ private[kafka] object LogValidator extends Logging { timestampType: TimestampType, timestampDiffMaxMs: Long, partitionLeaderEpoch: Int, - isFromClient: Boolean): ValidationAndOffsetAssignResult = { + isFromClient: Boolean, + magic: Byte): ValidationAndOffsetAssignResult = { var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L val initialOffset = offsetCounter.value - var isMagicV2 = false for (batch <- records.batches.asScala) { - validateBatch(batch, isFromClient) + validateBatch(batch, isFromClient, magic) var maxBatchTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxBatchTimestamp = -1L @@ -179,13 +185,11 @@ private[kafka] object LogValidator extends Logging { else batch.setMaxTimestamp(timestampType, maxBatchTimestamp) } - - isMagicV2 = batch.magic >= RecordBatch.MAGIC_VALUE_V2 } if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now - if (isMagicV2) + if (magic >= RecordBatch.MAGIC_VALUE_V2) offsetOfMaxTimestamp = offsetCounter.value - 1 else offsetOfMaxTimestamp = initialOffset @@ -211,21 +215,21 @@ private[kafka] object LogValidator extends Logging { sourceCodec: CompressionCodec, targetCodec: CompressionCodec, compactedTopic: Boolean, - magic: Byte, + toMagic: Byte, timestampType: TimestampType, timestampDiffMaxMs: Long, partitionLeaderEpoch: Int, isFromClient: Boolean): ValidationAndOffsetAssignResult = { // No in place assignment situation 1 and 2 - var inPlaceAssignment = sourceCodec == targetCodec && magic > RecordBatch.MAGIC_VALUE_V0 + var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0 var maxTimestamp = RecordBatch.NO_TIMESTAMP val expectedInnerOffset = new LongRef(0) val validatedRecords = new mutable.ArrayBuffer[Record] for (batch <- records.batches.asScala) { - validateBatch(batch, isFromClient) + validateBatch(batch, isFromClient, toMagic) // Do not compress control records unless they are written compressed if (sourceCodec == NoCompressionCodec && batch.isControlBatch) @@ -236,7 +240,7 @@ private[kafka] object LogValidator extends Logging { if (sourceCodec != NoCompressionCodec && record.isCompressed) throw new InvalidRecordException("Compressed outer record should not have an inner record with a " + s"compression attribute set: $record") - if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && magic > RecordBatch.MAGIC_VALUE_V0) { + if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { // Check if we need to overwrite offset // No in place assignment situation 3 if (record.offset != expectedInnerOffset.getAndIncrement()) @@ -246,7 +250,7 @@ private[kafka] object LogValidator extends Logging { } // No in place assignment situation 4 - if (!record.hasMagic(magic)) + if (!record.hasMagic(toMagic)) inPlaceAssignment = false validatedRecords += record @@ -261,7 +265,7 @@ private[kafka] object LogValidator extends Logging { val first = records.batches.asScala.head (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional) } - buildRecordsAndAssignOffsets(magic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now, + buildRecordsAndAssignOffsets(toMagic, offsetCounter, timestampType, CompressionType.forId(targetCodec.codec), now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch) } else { // we can update the batch only and write the compressed payload as is @@ -273,10 +277,10 @@ private[kafka] object LogValidator extends Logging { if (timestampType == TimestampType.LOG_APPEND_TIME) maxTimestamp = now - if (magic >= RecordBatch.MAGIC_VALUE_V1) + if (toMagic >= RecordBatch.MAGIC_VALUE_V1) batch.setMaxTimestamp(timestampType, maxTimestamp) - if (magic >= RecordBatch.MAGIC_VALUE_V2) + if (toMagic >= RecordBatch.MAGIC_VALUE_V2) batch.setPartitionLeaderEpoch(partitionLeaderEpoch) ValidationAndOffsetAssignResult(validatedRecords = records, http://git-wip-us.apache.org/repos/asf/kafka/blob/33216846/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 61fae80..f40745d 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -20,7 +20,7 @@ import java.nio.ByteBuffer import kafka.common.LongRef import kafka.message.{DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec} -import org.apache.kafka.common.errors.InvalidTimestampException +import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.test.TestUtils import org.junit.Assert._ @@ -807,6 +807,48 @@ class LogValidatorTest { isFromClient = true).validatedRecords, offset) } + @Test(expected = classOf[UnsupportedForMessageFormatException]) + def testDownConversionOfTransactionalRecordsNotPermitted() { + val offset = 1234567 + val producerId = 1344L + val producerEpoch = 16.toShort + val sequence = 0 + val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) + checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + compactedTopic = false, + magic = RecordBatch.MAGIC_VALUE_V1, + timestampType = TimestampType.CREATE_TIME, + timestampDiffMaxMs = 5000L, + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, + isFromClient = true).validatedRecords, offset) + } + + @Test(expected = classOf[UnsupportedForMessageFormatException]) + def testDownConversionOfIdempotentRecordsNotPermitted() { + val offset = 1234567 + val producerId = 1344L + val producerEpoch = 16.toShort + val sequence = 0 + val records = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, + new SimpleRecord("hello".getBytes), new SimpleRecord("there".getBytes), new SimpleRecord("beautiful".getBytes)) + checkOffsets(LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = DefaultCompressionCodec, + targetCodec = DefaultCompressionCodec, + compactedTopic = false, + magic = RecordBatch.MAGIC_VALUE_V1, + timestampType = TimestampType.CREATE_TIME, + timestampDiffMaxMs = 5000L, + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, + isFromClient = true).validatedRecords, offset) + } + @Test def testOffsetAssignmentAfterDownConversionV2ToV0NonCompressed() { val offset = 1234567
