Repository: kafka
Updated Branches:
  refs/heads/trunk d68f9e2fe -> bcaee7fe1


http://git-wip-us.apache.org/repos/asf/kafka/blob/bcaee7fe/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index ac1d623..3cc68ad 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -393,12 +393,12 @@ class ProducerStateManagerTest extends JUnitSuite {
     append(stateManager, producerId, epoch, sequence, offset = 99, 
isTransactional = true)
     assertEquals(Some(99), 
stateManager.firstUnstableOffset.map(_.messageOffset))
     append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true)
-    stateManager.evictUnretainedProducers(100)
+    stateManager.truncateHead(100)
     assertEquals(Some(106), 
stateManager.firstUnstableOffset.map(_.messageOffset))
   }
 
   @Test
-  def testEvictUnretainedPids(): Unit = {
+  def testTruncateHead(): Unit = {
     val epoch = 0.toShort
 
     append(stateManager, producerId, epoch, 0, 0L)
@@ -411,8 +411,8 @@ class ProducerStateManagerTest extends JUnitSuite {
     stateManager.takeSnapshot()
     assertEquals(Set(2, 4), currentSnapshotOffsets)
 
-    stateManager.evictUnretainedProducers(2)
-    assertEquals(Set(4), currentSnapshotOffsets)
+    stateManager.truncateHead(2)
+    assertEquals(Set(2, 4), currentSnapshotOffsets)
     assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
     assertEquals(None, stateManager.lastEntry(producerId))
 
@@ -420,18 +420,39 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertTrue(maybeEntry.isDefined)
     assertEquals(3L, maybeEntry.get.lastOffset)
 
-    stateManager.evictUnretainedProducers(3)
+    stateManager.truncateHead(3)
     assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
     assertEquals(Set(4), currentSnapshotOffsets)
     assertEquals(4, stateManager.mapEndOffset)
 
-    stateManager.evictUnretainedProducers(5)
+    stateManager.truncateHead(5)
     assertEquals(Set(), stateManager.activeProducers.keySet)
     assertEquals(Set(), currentSnapshotOffsets)
     assertEquals(5, stateManager.mapEndOffset)
   }
 
   @Test
+  def testLoadFromSnapshotRemovesNonRetainedProducers(): Unit = {
+    val epoch = 0.toShort
+    val pid1 = 1L
+    val pid2 = 2L
+
+    append(stateManager, pid1, epoch, 0, 0L)
+    append(stateManager, pid2, epoch, 0, 1L)
+    stateManager.takeSnapshot()
+    assertEquals(2, stateManager.activeProducers.size)
+
+    stateManager.truncateAndReload(1L, 2L, time.milliseconds())
+    assertEquals(1, stateManager.activeProducers.size)
+    assertEquals(None, stateManager.lastEntry(pid1))
+
+    val entry = stateManager.lastEntry(pid2)
+    assertTrue(entry.isDefined)
+    assertEquals(0, entry.get.lastSeq)
+    assertEquals(1L, entry.get.lastOffset)
+  }
+
+  @Test
   def testSkipSnapshotIfOffsetUnchanged(): Unit = {
     val epoch = 0.toShort
     append(stateManager, producerId, epoch, 0, 0L, 0L)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bcaee7fe/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a2c9b05..14f0114 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -352,13 +352,13 @@ object TestUtils extends Logging {
   def records(records: Iterable[SimpleRecord],
               magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
               codec: CompressionType = CompressionType.NONE,
-              pid: Long = RecordBatch.NO_PRODUCER_ID,
-              epoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
+              producerId: Long = RecordBatch.NO_PRODUCER_ID,
+              producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
               sequence: Int = RecordBatch.NO_SEQUENCE,
               baseOffset: Long = 0L): MemoryRecords = {
     val buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
     val builder = MemoryRecords.builder(buf, magicValue, codec, 
TimestampType.CREATE_TIME, baseOffset,
-      System.currentTimeMillis, pid, epoch, sequence)
+      System.currentTimeMillis, producerId, producerEpoch, sequence)
     records.foreach(builder.append)
     builder.build()
   }

Reply via email to