Repository: kafka Updated Branches: refs/heads/0.11.0 c62793eab -> 5dd2a49e6
MINOR: Preserve the base offset of the original record batch in V2 The previous code did not handle this correctly if a batch was compacted more than once. Also add test case for duplicate check after log cleaning and improve various comments. Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3145 from hachikuji/minor-improve-base-sequence-docs (cherry picked from commit 37433638271718344498d695d5da08db12c24eed) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5dd2a49e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5dd2a49e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5dd2a49e Branch: refs/heads/0.11.0 Commit: 5dd2a49e63685069bab5fd21a6cc426553a5a1de Parents: c62793e Author: Jason Gustafson <[email protected]> Authored: Fri May 26 09:41:17 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri May 26 09:50:50 2017 +0100 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 4 +- .../record/AbstractLegacyRecordBatch.java | 2 +- .../kafka/common/record/DefaultRecord.java | 4 +- .../kafka/common/record/DefaultRecordBatch.java | 9 +- .../kafka/common/record/MemoryRecords.java | 9 +- .../common/record/MemoryRecordsBuilder.java | 4 +- .../org/apache/kafka/common/record/Record.java | 2 +- .../apache/kafka/common/record/RecordBatch.java | 24 +++-- .../kafka/common/record/MemoryRecordsTest.java | 64 +++++++++++++- core/src/main/scala/kafka/log/Log.scala | 5 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 92 ++++++++++++++++---- 11 files changed, 178 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 01bd0e5..cd32850 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1035,9 +1035,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { if (containsAbortMarker(currentBatch)) { abortedProducerIds.remove(producerId); } else if (isBatchAborted(currentBatch)) { - log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}", + log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition {}", producerId, currentBatch.baseOffset(), partition); - nextFetchOffset = currentBatch.lastOffset() + 1; + nextFetchOffset = currentBatch.nextOffset(); continue; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 6ce3ba3..e028988 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -186,7 +186,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl } @Override - public long sequence() { + public int sequence() { return RecordBatch.NO_SEQUENCE; } http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index 9d0cd7e..05b5bb2 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -60,7 +60,7 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; * ---------------- * * The offset and timestamp deltas compute the difference relative to the base offset and - * base timestamp of the log entry that this record is contained in. + * base timestamp of the batch that this record is contained in. */ public class DefaultRecord implements Record { @@ -102,7 +102,7 @@ public class DefaultRecord implements Record { } @Override - public long sequence() { + public int sequence() { return sequence; } http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 13f958d..4e52d61 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -44,7 +44,7 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; * Magic => Int8 * CRC => Uint32 * Attributes => Int16 - * LastOffsetDelta => Int32 + * LastOffsetDelta => Int32 // also serves as LastSequenceDelta * BaseTimestamp => Int64 * MaxTimestamp => Int64 * ProducerId => Int64 @@ -61,6 +61,13 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; * computation to avoid the need to recompute the CRC when this field is assigned for every batch that is received by * the broker. The CRC-32C (Castagnoli) polynomial is used for the computation. * + * On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence + * numbers from the original batch when the log is cleaned. This is required in order to be able to restore the + * producer's state when the log is reloaded. If we did not retain the last sequence number, for example, then + * after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must + * be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying + * that the first and last sequence numbers of the incoming batch match the last from that producer). + * * The current attributes are given below: * * ------------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 7391e7e..d3bdee2 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -141,13 +141,9 @@ public class MemoryRecords extends AbstractRecords { byte batchMagic = batch.magic(); boolean writeOriginalEntry = true; - long firstOffset = -1; List<Record> retainedRecords = new ArrayList<>(); for (Record record : batch) { - if (firstOffset < 0) - firstOffset = record.offset(); - messagesRead += 1; if (filter.shouldRetain(batch, record)) { @@ -178,8 +174,11 @@ public class MemoryRecords extends AbstractRecords { ByteBuffer slice = destinationBuffer.slice(); TimestampType timestampType = batch.timestampType(); long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP; + long baseOffset = batchMagic >= RecordBatch.MAGIC_VALUE_V2 ? + batch.baseOffset() : retainedRecords.get(0).offset(); + MemoryRecordsBuilder builder = builder(slice, batch.magic(), batch.compressionType(), timestampType, - firstOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), + baseOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), batch.isTransactional(), batch.partitionLeaderEpoch()); for (Record record : retainedRecords) http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index bc25d75..e055aa5 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -213,7 +213,7 @@ public class MemoryRecordsBuilder { } } - public void setProducerState(long producerId, short epoch, int baseSequence) { + public void setProducerState(long producerId, short producerEpoch, int baseSequence) { if (isClosed()) { // Sequence numbers are assigned when the batch is closed while the accumulator is being drained. // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will @@ -222,7 +222,7 @@ public class MemoryRecordsBuilder { throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client."); } this.producerId = producerId; - this.producerEpoch = epoch; + this.producerEpoch = producerEpoch; this.baseSequence = baseSequence; } http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 6de28c3..ab52bef 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -38,7 +38,7 @@ public interface Record { * Get the sequence number assigned by the producer. * @return the sequence number */ - long sequence(); + int sequence(); /** * Get the size in bytes of this record. http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 42b0c2e..db75105 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -96,10 +96,11 @@ public interface RecordBatch extends Iterable<Record> { TimestampType timestampType(); /** - * Get the first offset contained in this record batch. For magic version prior to 2, this generally - * requires deep iteration and will return the offset of the first record in the record batch. For - * magic version 2 and above, this will return the first offset of the original record batch (i.e. - * prior to compaction). For non-compacted topics, the behavior is equivalent. + * Get the base offset contained in this record batch. For magic version prior to 2, the base offset will + * always be the offset of the first message in the batch. This generally requires deep iteration and will + * return the offset of the first record in the record batch. For magic version 2 and above, this will return + * the first offset of the original record batch (i.e. prior to compaction). For non-compacted topics, the + * behavior is equivalent. * * Because this requires deep iteration for older magic versions, this method should be used with * caution. Generally {@link #lastOffset()} is safer since access is efficient for all magic versions. @@ -110,8 +111,9 @@ public interface RecordBatch extends Iterable<Record> { long baseOffset(); /** - * Get the last offset in this record batch (inclusive). Unlike {@link #baseOffset()}, the last offset - * always reflects the offset of the last record in the batch, even after compaction. + * Get the last offset in this record batch (inclusive). Just like {@link #baseOffset()}, the last offset + * always reflects the offset of the last record in the original batch, even if it is removed during log + * compaction. * * @return The offset of the last record in this batch */ @@ -132,7 +134,7 @@ public interface RecordBatch extends Iterable<Record> { byte magic(); /** - * Get the producer id for this log record batch. For older magic versions, this will return 0. + * Get the producer id for this log record batch. For older magic versions, this will return -1. * * @return The producer id or -1 if there is none */ @@ -151,13 +153,17 @@ public interface RecordBatch extends Iterable<Record> { boolean hasProducerId(); /** - * Get the first sequence number of this record batch. + * Get the base sequence number of this record batch. Like {@link #baseOffset()}, this value is not + * affected by compaction: it always retains the base sequence number from the original batch. + * * @return The first sequence number or -1 if there is none */ int baseSequence(); /** - * Get the last sequence number of this record batch. + * Get the last sequence number of this record batch. Like {@link #lastOffset()}, the last sequence number + * always reflects the sequence number of the last record in the original batch, even if it is removed during log + * compaction. * * @return The last sequence number or -1 if there is none */ http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 5a34f0f..afd0126 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -302,6 +302,50 @@ public class MemoryRecordsTest { } @Test + public void testFilterToAlreadyCompactedLog() { + ByteBuffer buffer = ByteBuffer.allocate(2048); + + // create a batch with some offset gaps to simulate a compacted batch + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, + TimestampType.CREATE_TIME, 0L); + builder.appendWithOffset(5L, 10L, null, "a".getBytes()); + builder.appendWithOffset(8L, 11L, "1".getBytes(), "b".getBytes()); + builder.appendWithOffset(10L, 12L, null, "c".getBytes()); + + builder.close(); + buffer.flip(); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered); + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); + assertEquals(1, batches.size()); + + MutableRecordBatch batch = batches.get(0); + List<Record> records = TestUtils.toList(batch); + assertEquals(1, records.size()); + assertEquals(8L, records.get(0).offset()); + + + if (magic >= RecordBatch.MAGIC_VALUE_V1) + assertEquals(new SimpleRecord(11L, "1".getBytes(), "b".getBytes()), new SimpleRecord(records.get(0))); + else + assertEquals(new SimpleRecord(RecordBatch.NO_TIMESTAMP, "1".getBytes(), "b".getBytes()), + new SimpleRecord(records.get(0))); + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + // the new format preserves first and last offsets from the original batch + assertEquals(0L, batch.baseOffset()); + assertEquals(10L, batch.lastOffset()); + } else { + assertEquals(8L, batch.baseOffset()); + assertEquals(8L, batch.lastOffset()); + } + } + + @Test public void testFilterToPreservesProducerInfo() { if (magic >= RecordBatch.MAGIC_VALUE_V2) { ByteBuffer buffer = ByteBuffer.allocate(2048); @@ -332,8 +376,8 @@ public class MemoryRecordsTest { builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L, RecordBatch.NO_TIMESTAMP, pid2, epoch2, baseSequence2, true, RecordBatch.NO_PARTITION_LEADER_EPOCH); builder.append(16L, "6".getBytes(), "g".getBytes()); - builder.append(17L, null, "h".getBytes()); - builder.append(18L, "8".getBytes(), "i".getBytes()); + builder.append(17L, "7".getBytes(), "h".getBytes()); + builder.append(18L, null, "i".getBytes()); builder.close(); buffer.flip(); @@ -356,6 +400,10 @@ public class MemoryRecordsTest { assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.baseSequence()); assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.lastSequence()); assertFalse(firstBatch.isTransactional()); + List<Record> firstBatchRecords = TestUtils.toList(firstBatch); + assertEquals(1, firstBatchRecords.size()); + assertEquals(RecordBatch.NO_SEQUENCE, firstBatchRecords.get(0).sequence()); + assertEquals(new SimpleRecord(11L, "1".getBytes(), "b".getBytes()), new SimpleRecord(firstBatchRecords.get(0))); MutableRecordBatch secondBatch = batches.get(1); assertEquals(2, secondBatch.countOrNull().intValue()); @@ -366,6 +414,12 @@ public class MemoryRecordsTest { assertEquals(baseSequence1, secondBatch.baseSequence()); assertEquals(baseSequence1 + 2, secondBatch.lastSequence()); assertFalse(secondBatch.isTransactional()); + List<Record> secondBatchRecords = TestUtils.toList(secondBatch); + assertEquals(2, secondBatchRecords.size()); + assertEquals(baseSequence1 + 1, secondBatchRecords.get(0).sequence()); + assertEquals(new SimpleRecord(14L, "4".getBytes(), "e".getBytes()), new SimpleRecord(secondBatchRecords.get(0))); + assertEquals(baseSequence1 + 2, secondBatchRecords.get(1).sequence()); + assertEquals(new SimpleRecord(15L, "5".getBytes(), "f".getBytes()), new SimpleRecord(secondBatchRecords.get(1))); MutableRecordBatch thirdBatch = batches.get(2); assertEquals(2, thirdBatch.countOrNull().intValue()); @@ -376,6 +430,12 @@ public class MemoryRecordsTest { assertEquals(baseSequence2, thirdBatch.baseSequence()); assertEquals(baseSequence2 + 2, thirdBatch.lastSequence()); assertTrue(thirdBatch.isTransactional()); + List<Record> thirdBatchRecords = TestUtils.toList(thirdBatch); + assertEquals(2, thirdBatchRecords.size()); + assertEquals(baseSequence2, thirdBatchRecords.get(0).sequence()); + assertEquals(new SimpleRecord(16L, "6".getBytes(), "g".getBytes()), new SimpleRecord(thirdBatchRecords.get(0))); + assertEquals(baseSequence2 + 1, thirdBatchRecords.get(1).sequence()); + assertEquals(new SimpleRecord(17L, "7".getBytes(), "h".getBytes()), new SimpleRecord(thirdBatchRecords.get(1))); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 55eb46a..67b9271 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -56,7 +56,8 @@ object LogAppendInfo { /** * Struct to hold various quantities we compute about each message set before appending to the log * - * @param firstOffset The first offset in the message set + * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending + * to the follower. In that case, this will be the last offset for performance reasons. * @param lastOffset The last offset in the message set * @param maxTimestamp The maximum timestamp of the message set. * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp. @@ -750,6 +751,8 @@ class Log(@volatile var dir: File, // update the first offset if on the first message. For magic versions older than 2, we use the last offset // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). // For magic version 2, we can get the first offset directly from the batch header. + // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower + // case, validation will be more lenient. if (firstOffset < 0) firstOffset = if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) batch.baseOffset else batch.lastOffset http://git-wip-us.apache.org/repos/asf/kafka/blob/5dd2a49e/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 6eb65ca..a280679 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -19,7 +19,7 @@ package kafka.log import java.io.File import java.nio._ -import java.nio.file.Paths +import java.nio.file.{Files, Paths} import java.util.Properties import kafka.common._ @@ -39,7 +39,7 @@ import scala.collection._ * Unit tests for the log cleaning logic */ class LogCleanerTest extends JUnitSuite { - + val tmpdir = TestUtils.tempDir() val dir = TestUtils.randomPartitionLogDir(tmpdir) val logProps = new Properties() @@ -50,12 +50,12 @@ class LogCleanerTest extends JUnitSuite { val logConfig = LogConfig(logProps) val time = new MockTime() val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) - + @After def teardown(): Unit = { Utils.delete(tmpdir) } - + /** * Test simple log cleaning */ @@ -89,6 +89,66 @@ class LogCleanerTest extends JUnitSuite { } @Test + def testDuplicateCheckAfterCleaning(): Unit = { + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 2048: java.lang.Integer) + var log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + val producerEpoch = 0.toShort + val pid1 = 1 + val pid2 = 2 + val pid3 = 3 + val pid4 = 4 + + appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3)) + appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4)) + appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4)) + + log.roll() + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset)) + assertEquals(List(2, 3, 3, 4, 1, 4), keysInLog(log)) + assertEquals(List(1, 2, 3, 5, 6, 7), offsetsInLog(log)) + + // we have to reload the log to validate that the cleaner maintained sequence numbers correctly + def reloadLog(): Unit = { + log.close() + log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps), recoveryPoint = 0L) + } + + reloadLog() + + // check duplicate append from producer 1 + var logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3)) + assertEquals(0L, logAppendInfo.firstOffset) + assertEquals(2L, logAppendInfo.lastOffset) + + // check duplicate append from producer 3 + logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4)) + assertEquals(6L, logAppendInfo.firstOffset) + assertEquals(7L, logAppendInfo.lastOffset) + + // check duplicate append from producer 2 + logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4)) + assertEquals(3L, logAppendInfo.firstOffset) + assertEquals(5L, logAppendInfo.lastOffset) + + // do one more append and a round of cleaning to force another deletion from producer 1's batch + appendIdempotentAsLeader(log, pid4, producerEpoch)(Seq(2)) + log.roll() + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset)) + assertEquals(List(3, 3, 4, 1, 4, 2), keysInLog(log)) + assertEquals(List(2, 3, 5, 6, 7, 8), offsetsInLog(log)) + + reloadLog() + + // duplicate append from producer1 should still be fine + logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3)) + assertEquals(0L, logAppendInfo.firstOffset) + assertEquals(2L, logAppendInfo.lastOffset) + } + + @Test def testBasicTransactionAwareCleaning(): Unit = { val cleaner = makeCleaner(Int.MaxValue) val logProps = new Properties() @@ -974,8 +1034,8 @@ class LogCleanerTest extends JUnitSuite { private def messageWithOffset(key: Int, value: Int, offset: Long): MemoryRecords = messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset) - private def makeLog(dir: File = dir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, + private def makeLog(dir: File = dir, config: LogConfig = logConfig, recoveryPoint: Long = 0L) = + new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler, time = time, brokerTopicStats = new BrokerTopicStats) private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ } @@ -1006,23 +1066,25 @@ class LogCleanerTest extends JUnitSuite { partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes)) } - private def transactionalRecords(records: Seq[SimpleRecord], - producerId: Long, - producerEpoch: Short, - sequence: Int): MemoryRecords = { - MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, records: _*) + private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => LogAppendInfo = { + appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true) } - private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => Unit = { + private def appendIdempotentAsLeader(log: Log, producerId: Long, + producerEpoch: Short = 0, + isTransactional: Boolean = false): Seq[Int] => LogAppendInfo = { var sequence = 0 keys: Seq[Int] => { val simpleRecords = keys.map { key => val keyBytes = key.toString.getBytes - new SimpleRecord(keyBytes, keyBytes) // the value doesn't matter too much since we validate offsets + new SimpleRecord(time.milliseconds(), keyBytes, keyBytes) // the value doesn't matter since we validate offsets } - val records = transactionalRecords(simpleRecords, producerId, producerEpoch, sequence) - log.appendAsLeader(records, leaderEpoch = 0) + val records = if (isTransactional) + MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*) + else + MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*) sequence += simpleRecords.size + log.appendAsLeader(records, leaderEpoch = 0) } }
