divijvaidya commented on code in PR #14243:
URL: https://github.com/apache/kafka/pull/14243#discussion_r1311338620
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -669,4 +705,8 @@ public static String
remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) +
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
}
+ public static int estimatedEntrySize(Entry entry) {
Review Comment:
this should return long. Sum of individual ints could be long.
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -463,6 +489,8 @@ public static class Entry implements AutoCloseable {
private boolean markedForCleanup = false;
+ private int entrySize = 0;
Review Comment:
suggested rename entrySizeBytes
##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -64,7 +64,7 @@ class RemoteIndexCacheTest {
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset,
lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L))
- cache = new RemoteIndexCache(rsm, tpDir.toString)
+ cache = new RemoteIndexCache(Long.MaxValue, rsm, tpDir.toString)
Review Comment:
Long.MaxValue is a very large size to initialize with. It will cause OOM.
Can we have it to a smaller value here which would be sufficient for number of
entries we are adding in the tests such as 1 MB?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -669,4 +705,8 @@ public static String
remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) +
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
}
+ public static int estimatedEntrySize(Entry entry) {
+ return entry.offsetIndex.sizeInBytes() + entry.timeIndex.sizeInBytes()
+ (int) entry.txnIndex.file().length();
Review Comment:
please use
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#size-java.nio.file.Path-
as it is more performant that calculating the length()
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -110,12 +110,12 @@ public class RemoteIndexCache implements Closeable {
*
* We use {@link Caffeine} cache instead of implementing a thread safe LRU
cache on our own.
*/
- private final Cache<Uuid, Entry> internalCache;
+ private Cache<Uuid, Entry> internalCache;
private final RemoteStorageManager remoteStorageManager;
private final ShutdownableThread cleanerThread;
public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String
logDir) throws IOException {
- this(1024, remoteStorageManager, logDir);
+ this(1024L, remoteStorageManager, logDir);
Review Comment:
please move this into a constant, DEFAULT_REMOTE_INDEX_CACHE_SIZE_BYTES
##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -1125,3 +1128,48 @@ class DynamicProducerStateManagerConfig(val
producerStateManagerConfig: Producer
override def reconfigurableConfigs: Set[String] =
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
}
+
+class DynamicRemoteLogManagerConfig(server: KafkaBroker) extends
BrokerReconfigurable with Logging {
Review Comment:
Perhaps keep it is `DynamicRemoteLogConfig` so that we can add other remote
storage related configs here. This will also align with RemoteLogConfig
sub-class in LogConfig.java
##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -406,9 +417,16 @@ class RemoteIndexCacheTest {
@Test
def testReloadCacheAfterClose(): Unit = {
// close existing cache created in test setup before creating a new one
+ val cacheForEstimate = new RemoteIndexCache(2L, rsm, tpDir.toString)
+ val tpIdForEstimate = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0))
+ val metadataListForEstimate = generateRemoteLogSegmentMetadata(size = 1,
tpIdForEstimate)
+ val entryForEstimate =
cacheForEstimate.getIndexEntry(metadataListForEstimate.head)
+ val estimateEntryBytesSize =
RemoteIndexCache.estimatedEntrySize(entryForEstimate)
+ Utils.closeQuietly(cacheForEstimate, "RemoteIndexCache created for
estimating entry size")
Review Comment:
do we need to do all this? Instead can't we initialize with a big enough
size that works for this test case?
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -395,7 +418,11 @@ private RemoteIndexCache.Entry
createCacheEntry(RemoteLogSegmentMetadata remoteL
}
});
- return new Entry(offsetIndex, timeIndex, txnIndex);
+ Entry entry = new Entry(offsetIndex, timeIndex, txnIndex);
+ int entrySize = estimatedEntrySize(entry);
+ entry.setEntrySize(entrySize);
Review Comment:
instead of exposing a public function to set the entry size, perhaps it will
be better to keep it private and calculate in constructor. Asking because there
is no use case where an entity outside `Entry` will ever possibly want to set
entry size based on it's own calculation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]