Repository: kafka Updated Branches: refs/heads/trunk d68f9e2fe -> bcaee7fe1
http://git-wip-us.apache.org/repos/asf/kafka/blob/bcaee7fe/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index ac1d623..3cc68ad 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -393,12 +393,12 @@ class ProducerStateManagerTest extends JUnitSuite { append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset)) append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true) - stateManager.evictUnretainedProducers(100) + stateManager.truncateHead(100) assertEquals(Some(106), stateManager.firstUnstableOffset.map(_.messageOffset)) } @Test - def testEvictUnretainedPids(): Unit = { + def testTruncateHead(): Unit = { val epoch = 0.toShort append(stateManager, producerId, epoch, 0, 0L) @@ -411,8 +411,8 @@ class ProducerStateManagerTest extends JUnitSuite { stateManager.takeSnapshot() assertEquals(Set(2, 4), currentSnapshotOffsets) - stateManager.evictUnretainedProducers(2) - assertEquals(Set(4), currentSnapshotOffsets) + stateManager.truncateHead(2) + assertEquals(Set(2, 4), currentSnapshotOffsets) assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) assertEquals(None, stateManager.lastEntry(producerId)) @@ -420,18 +420,39 @@ class ProducerStateManagerTest extends JUnitSuite { assertTrue(maybeEntry.isDefined) assertEquals(3L, maybeEntry.get.lastOffset) - stateManager.evictUnretainedProducers(3) + stateManager.truncateHead(3) assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) assertEquals(Set(4), currentSnapshotOffsets) assertEquals(4, stateManager.mapEndOffset) - stateManager.evictUnretainedProducers(5) + stateManager.truncateHead(5) assertEquals(Set(), stateManager.activeProducers.keySet) assertEquals(Set(), currentSnapshotOffsets) assertEquals(5, stateManager.mapEndOffset) } @Test + def testLoadFromSnapshotRemovesNonRetainedProducers(): Unit = { + val epoch = 0.toShort + val pid1 = 1L + val pid2 = 2L + + append(stateManager, pid1, epoch, 0, 0L) + append(stateManager, pid2, epoch, 0, 1L) + stateManager.takeSnapshot() + assertEquals(2, stateManager.activeProducers.size) + + stateManager.truncateAndReload(1L, 2L, time.milliseconds()) + assertEquals(1, stateManager.activeProducers.size) + assertEquals(None, stateManager.lastEntry(pid1)) + + val entry = stateManager.lastEntry(pid2) + assertTrue(entry.isDefined) + assertEquals(0, entry.get.lastSeq) + assertEquals(1L, entry.get.lastOffset) + } + + @Test def testSkipSnapshotIfOffsetUnchanged(): Unit = { val epoch = 0.toShort append(stateManager, producerId, epoch, 0, 0L, 0L) http://git-wip-us.apache.org/repos/asf/kafka/blob/bcaee7fe/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a2c9b05..14f0114 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -352,13 +352,13 @@ object TestUtils extends Logging { def records(records: Iterable[SimpleRecord], magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE, codec: CompressionType = CompressionType.NONE, - pid: Long = RecordBatch.NO_PRODUCER_ID, - epoch: Short = RecordBatch.NO_PRODUCER_EPOCH, + producerId: Long = RecordBatch.NO_PRODUCER_ID, + producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, sequence: Int = RecordBatch.NO_SEQUENCE, baseOffset: Long = 0L): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset, - System.currentTimeMillis, pid, epoch, sequence) + System.currentTimeMillis, producerId, producerEpoch, sequence) records.foreach(builder.append) builder.build() }
