jsancio commented on code in PR #12963:
URL: https://github.com/apache/kafka/pull/12963#discussion_r1042809968


##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
       image = newImage
     }
 
-    override def publishedOffset: Long = -1
+    override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {
-    (0 to 10000).foreach { _ =>
+    (0 until 10000).foreach { _ =>
       listener.handleCommit(
         RecordTestUtils.mockBatchReader(
           endOffset,

Review Comment:
   This is probably beyond the scope of this PR but it is unfortunate that this 
test is creating a sequence of batches that are impossible with KRaft. I assume 
that we are generating this many record to trigger size-based snapshots.



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -248,28 +254,28 @@ class BrokerMetadataListenerTest {
       assertEquals(200L, listener.highestMetadataOffset)
 
       // Check that we generate at least one snapshot once we see enough 
records.
-      assertEquals(-1L, snapshotter.prevCommittedOffset)
-      generateManyRecords(listener, 1000L)
-      assertEquals(1000L, snapshotter.prevCommittedOffset)
-      assertEquals(1000L, snapshotter.activeSnapshotOffset)
-      snapshotter.activeSnapshotOffset = -1L
+      assertNull(snapshotter.prevSnapshotId)
+      generateManyRecords(listener, endOffset = 1000L)
+      assertEquals(new OffsetAndEpoch(1000L, 0), snapshotter.prevSnapshotId)
+      assertEquals(new OffsetAndEpoch(1000L, 0), snapshotter.activeSnapshotId)
+      snapshotter.activeSnapshotId = null
 
       // Test creating a new snapshot after publishing it.
       val publisher = new MockMetadataPublisher()
       listener.startPublishing(publisher).get()
-      generateManyRecords(listener, 2000L)
+      generateManyRecords(listener, endOffset = 2000L)
       listener.getImageRecords().get()
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      assertEquals(2000L, snapshotter.prevCommittedOffset)
+      assertEquals(new OffsetAndEpoch(2000L, 0), snapshotter.prevSnapshotId)
+      assertEquals(new OffsetAndEpoch(2000L, 0), snapshotter.activeSnapshotId)
 
       // Test how we handle the snapshotter returning false.
-      generateManyRecords(listener, 3000L)
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      generateManyRecords(listener, 4000L)
-      assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      snapshotter.activeSnapshotOffset = -1L
-      generateManyRecords(listener, 5000L)
-      assertEquals(5000L, snapshotter.activeSnapshotOffset)
+      generateManyRecords(listener, endOffset = 3000L)
+      assertEquals(new OffsetAndEpoch(2000L, 0), snapshotter.activeSnapshotId)
+      generateManyRecords(listener, endOffset = 4000L)
+      assertEquals(new OffsetAndEpoch(2000L, 0), snapshotter.activeSnapshotId)
+      snapshotter.activeSnapshotId = null
+      generateManyRecords(listener, endOffset = 5000L)
+      assertEquals(new OffsetAndEpoch(5000L, 0), snapshotter.activeSnapshotId)

Review Comment:
   Given my comments regarding `mockBatchReader`. I would expect these snapshot 
id comparisons to fail, no?



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -166,15 +172,15 @@ class BrokerMetadataListenerTest {
       image = newImage
     }
 
-    override def publishedOffset: Long = -1
+    override def publishedEndOffset: Long = -1
   }
 
   private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
   private val BAR_ID = Uuid.fromString("SzN5j0LvSEaRIJHrxfMAlg")
 
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {

Review Comment:
   I think there is some inconsistency here. This field is called `endOffset` 
and it is passed to `mockBatchReader`. `mockBatchReader` calls this parameter 
`lastOffset`. Kafka tends to make "endOffsets" exclusive while "lastOffsets" 
are inclusive.
   
   Looking at the implementation of `mockBatchReader`, `lastOffset` is treated 
as the inclusive last offset of the batch.



##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -1230,7 +1230,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(
       () => {
         brokers.forall { broker =>
-          val metadataOffset = 
broker.asInstanceOf[BrokerServer].metadataPublisher.publishedOffset
+          val metadataOffset = 
broker.asInstanceOf[BrokerServer].metadataPublisher.publishedEndOffset
           metadataOffset >= controllerOffset

Review Comment:
   `controllerOffset` is the last offset in the controller, inclusive. 
metadataOffset is exclusive. Maybe the best change is to have this line instead 
in line 1229:
   ```scala
   val controllerEndOffset = 
controllerServer.raftManager.replicatedLog.endOffset().offset
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to