Repository: kafka Updated Branches: refs/heads/0.11.0 93308d101 -> cad725a2f
KAFKA-5353; baseTimestamp should always have a create timestamp This makes the case where we build the records from scratch consistent with the case where update the batch header "in place". Thanks to edenhill who found the issue while testing librdkafka. The reason our tests donât catch this is that we rely on the maxTimestamp to compute the record level timestamps if log append time is used. Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3177 from ijuma/set-base-sequence-for-log-append-time (cherry picked from commit 647afeff6a2e3fd78328f6989e8d9f96bcde5121) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cad725a2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cad725a2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cad725a2 Branch: refs/heads/0.11.0 Commit: cad725a2f68dce0fbbfa6063cc9d2cd63bf49c7c Parents: 93308d1 Author: Ismael Juma <[email protected]> Authored: Thu Jun 1 00:16:55 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu Jun 1 00:17:18 2017 +0100 ---------------------------------------------------------------------- .../kafka/common/record/DefaultRecordBatch.java | 8 ++++- .../common/record/MemoryRecordsBuilder.java | 8 ++--- .../kafka/common/record/MutableRecordBatch.java | 8 +++-- .../apache/kafka/common/record/RecordBatch.java | 6 ++-- .../scala/unit/kafka/log/LogValidatorTest.scala | 35 +++++++++++++++----- 5 files changed, 45 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cad725a2/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index bdba860..7a0e530 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -131,7 +131,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe + ", computed crc = " + computeChecksum() + ")"); } - private long baseTimestamp() { + /** + * Get the timestamp of the first record in this batch. It is always the create time of the record even if the + * timestamp type of the batch is log append time. + * + * @return The base timestamp + */ + public long baseTimestamp() { return buffer.getLong(BASE_TIMESTAMP_OFFSET); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cad725a2/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index aaca851..66560ca 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -331,15 +331,11 @@ public class MemoryRecordsBuilder { int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD; int offsetDelta = (int) (lastOffset - baseOffset); - final long baseTimestamp; final long maxTimestamp; - if (timestampType == TimestampType.LOG_APPEND_TIME) { - baseTimestamp = logAppendTime; + if (timestampType == TimestampType.LOG_APPEND_TIME) maxTimestamp = logAppendTime; - } else { - baseTimestamp = this.baseTimestamp; + else maxTimestamp = this.maxTimestamp; - } DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, http://git-wip-us.apache.org/repos/asf/kafka/blob/cad725a2/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java index 728b6eb..8049469 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MutableRecordBatch.java @@ -32,8 +32,12 @@ public interface MutableRecordBatch extends RecordBatch { /** * Set the max timestamp for this batch. When using log append time, this effectively overrides the individual - * timestamps of all the records contained in the batch. Note that this typically requires re-computation - * of the batch's CRC. + * timestamps of all the records contained in the batch. To avoid recompression, the record fields are not updated + * by this method, but clients ignore them if the timestamp time is log append time. Note that baseTimestamp is not + * updated by this method. + * + * This typically requires re-computation of the batch's CRC. + * * @param timestampType The timestamp type * @param maxTimestamp The maximum timestamp */ http://git-wip-us.apache.org/repos/asf/kafka/blob/cad725a2/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index ef773da..65a6a95 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -80,8 +80,10 @@ public interface RecordBatch extends Iterable<Record> { long checksum(); /** - * Get the timestamp of this record batch. This is the max timestamp among all records contained in this batch. - * This value is updated during compaction. + * Get the max timestamp or log append time of this record batch. + * + * If the timestamp type is create time, this is the max timestamp among all records contained in this batch and + * the value is updated during compaction. * * @return The max timestamp */ http://git-wip-us.apache.org/repos/asf/kafka/blob/cad725a2/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index f40745d..3ab9732 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -38,7 +38,7 @@ class LogValidatorTest { private def checkLogAppendTimeNonCompressed(magic: Byte) { val now = System.currentTimeMillis() // The timestamps should be overwritten - val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.NONE) + val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.NONE) val validatedResults = LogValidator.validateMessagesAndAssignOffsets(records, offsetCounter = new LongRef(0), now = now, @@ -52,7 +52,7 @@ class LogValidatorTest { isFromClient = true) val validatedRecords = validatedResults.validatedRecords assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) - validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch)) + validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) @@ -86,7 +86,7 @@ class LogValidatorTest { val validatedRecords = validatedResults.validatedRecords assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) - validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch)) + validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, -1, batch)) assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}", @@ -107,7 +107,7 @@ class LogValidatorTest { private def checkLogAppendTimeWithoutRecompression(magic: Byte) { val now = System.currentTimeMillis() // The timestamps should be overwritten - val records = createRecords(magicValue = magic, timestamp = 0L, codec = CompressionType.GZIP) + val records = createRecords(magicValue = magic, timestamp = 1234L, codec = CompressionType.GZIP) val validatedResults = LogValidator.validateMessagesAndAssignOffsets( records, offsetCounter = new LongRef(0), @@ -124,7 +124,7 @@ class LogValidatorTest { assertEquals("message set size should not change", records.records.asScala.size, validatedRecords.records.asScala.size) - validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, batch)) + validatedRecords.batches.asScala.foreach(batch => validateLogAppendTime(now, 1234L, batch)) assertTrue("MessageSet should still valid", validatedRecords.batches.iterator.next().isValid) assertEquals(s"Max timestamp should be $now", now, validatedResults.maxTimestamp) assertEquals(s"The offset of max timestamp should be ${records.records.asScala.size - 1}", @@ -176,6 +176,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) assertEquals(batch.timestampType, TimestampType.CREATE_TIME) + maybeCheckBaseTimestamp(timestampSeq(0), batch) assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) assertEquals(producerEpoch, batch.producerEpoch) assertEquals(producerId, batch.producerId) @@ -237,6 +238,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) assertEquals(batch.timestampType, TimestampType.CREATE_TIME) + maybeCheckBaseTimestamp(timestampSeq(0), batch) assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) assertEquals(producerEpoch, batch.producerEpoch) assertEquals(producerId, batch.producerId) @@ -280,6 +282,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) + maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch) assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp) assertEquals(TimestampType.CREATE_TIME, batch.timestampType) assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch) @@ -316,6 +319,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) + maybeCheckBaseTimestamp(timestamp, batch) assertEquals(timestamp, batch.maxTimestamp) assertEquals(TimestampType.CREATE_TIME, batch.timestampType) assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch) @@ -367,6 +371,7 @@ class LogValidatorTest { for (batch <- validatedRecords.batches.asScala) { assertTrue(batch.isValid) assertEquals(batch.timestampType, TimestampType.CREATE_TIME) + maybeCheckBaseTimestamp(timestampSeq(0), batch) assertEquals(batch.maxTimestamp, batch.asScala.map(_.timestamp).max) assertEquals(producerEpoch, batch.producerEpoch) assertEquals(producerId, batch.producerId) @@ -945,13 +950,25 @@ class LogValidatorTest { builder.build() } - def validateLogAppendTime(now: Long, batch: RecordBatch) { + def maybeCheckBaseTimestamp(expected: Long, batch: RecordBatch): Unit = { + batch match { + case b: DefaultRecordBatch => + assertEquals(s"Unexpected base timestamp of batch $batch", expected, b.baseTimestamp) + case _ => // no-op + } + } + + /** + * expectedLogAppendTime is only checked if batch.magic is V2 or higher + */ + def validateLogAppendTime(expectedLogAppendTime: Long, expectedBaseTimestamp: Long, batch: RecordBatch) { assertTrue(batch.isValid) - assertTrue(batch.timestampType() == TimestampType.LOG_APPEND_TIME) - assertEquals(s"Timestamp of message $batch should be $now", now, batch.maxTimestamp) + assertTrue(batch.timestampType == TimestampType.LOG_APPEND_TIME) + assertEquals(s"Unexpected max timestamp of batch $batch", expectedLogAppendTime, batch.maxTimestamp) + maybeCheckBaseTimestamp(expectedBaseTimestamp, batch) for (record <- batch.asScala) { assertTrue(record.isValid) - assertEquals(s"Timestamp of message $record should be $now", now, record.timestamp) + assertEquals(s"Unexpected timestamp of record $record", expectedLogAppendTime, record.timestamp) } }
