kamalcph commented on code in PR #14243:
URL: https://github.com/apache/kafka/pull/14243#discussion_r1323969945
##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -594,4 +575,16 @@ class RemoteIndexCacheTest {
timeIndex.flush()
}
}
+
+ private def getEstimateEntryBytesSize(): Long = {
+ val cacheForEstimate = new RemoteIndexCache(2L, rsm, tpDir.toString)
+ val tpIdForEstimate = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0))
+ val metadataListForEstimate = generateRemoteLogSegmentMetadata(size = 1,
tpIdForEstimate)
+ val entryForEstimate =
cacheForEstimate.getIndexEntry(metadataListForEstimate.head)
+ val estimateEntryBytesSize = entryForEstimate.entrySizeBytes()
+ Utils.closeQuietly(cacheForEstimate, "RemoteIndexCache created for
estimating entry size")
+ cleanup()
+ setup()
Review Comment:
My preference is to have a different partition directory to avoid using the
`setUp` and `cleanup` methods which are common to all the tests inside the test:
``` scala
private def estimateOneEntryBytesSize(): Long = {
val tp = new TopicPartition("estimate-entry-bytes-size", 0)
val tpId = new TopicIdPartition(Uuid.randomUuid(), tp)
val tpDir = new File(logDir, tpId.toString)
Files.createDirectory(tpDir.toPath)
val rsm = mock(classOf[RemoteStorageManager])
mockRsmFetchIndex(rsm)
val cache = new RemoteIndexCache(2L, rsm, tpDir.toString)
val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
val entry = cache.getIndexEntry(metadataList.head)
val entrySizeInBytes = entry.entrySizeBytes()
Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry
size")
entrySizeInBytes
}
// We can reuse this method in the setup()
private def mockRsmFetchIndex(rsm: RemoteStorageManager): Unit = {
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]),
any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not
accessed.
}
})
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]