divijvaidya commented on code in PR #14243:
URL: https://github.com/apache/kafka/pull/14243#discussion_r1311338620


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -669,4 +705,8 @@ public static String 
remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem
         return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
     }
 
+    public static int estimatedEntrySize(Entry entry) {

Review Comment:
   this should return long. Sum of individual ints could be long.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -463,6 +489,8 @@ public static class Entry implements AutoCloseable {
 
         private boolean markedForCleanup = false;
 
+        private int entrySize = 0;

Review Comment:
   suggested rename entrySizeBytes



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -64,7 +64,7 @@ class RemoteIndexCacheTest {
     rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, 
lastOffset,
       time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
 
-    cache = new RemoteIndexCache(rsm, tpDir.toString)
+    cache = new RemoteIndexCache(Long.MaxValue, rsm, tpDir.toString)

Review Comment:
   Long.MaxValue is a very large size to initialize with. It will cause OOM. 
Can we have it to a smaller value here which would be sufficient for number of 
entries we are adding in the tests such as 1 MB?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -669,4 +705,8 @@ public static String 
remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem
         return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
     }
 
+    public static int estimatedEntrySize(Entry entry) {
+        return entry.offsetIndex.sizeInBytes() + entry.timeIndex.sizeInBytes() 
+ (int) entry.txnIndex.file().length();

Review Comment:
   please use 
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#size-java.nio.file.Path-
 as it is more performant that calculating the length()



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -110,12 +110,12 @@ public class RemoteIndexCache implements Closeable {
      *
      * We use {@link Caffeine} cache instead of implementing a thread safe LRU 
cache on our own.
      */
-    private final Cache<Uuid, Entry> internalCache;
+    private Cache<Uuid, Entry> internalCache;
     private final RemoteStorageManager remoteStorageManager;
     private final ShutdownableThread cleanerThread;
 
     public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {
-        this(1024, remoteStorageManager, logDir);
+        this(1024L, remoteStorageManager, logDir);

Review Comment:
   please move this into a constant, DEFAULT_REMOTE_INDEX_CACHE_SIZE_BYTES



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -1125,3 +1128,48 @@ class DynamicProducerStateManagerConfig(val 
producerStateManagerConfig: Producer
   override def reconfigurableConfigs: Set[String] = 
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
 
 }
+
+class DynamicRemoteLogManagerConfig(server: KafkaBroker) extends 
BrokerReconfigurable with Logging {

Review Comment:
   Perhaps keep it is `DynamicRemoteLogConfig` so that we can add other remote 
storage related configs here. This will also align with RemoteLogConfig 
sub-class in LogConfig.java



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -406,9 +417,16 @@ class RemoteIndexCacheTest {
   @Test
   def testReloadCacheAfterClose(): Unit = {
     // close existing cache created in test setup before creating a new one
+    val cacheForEstimate = new RemoteIndexCache(2L, rsm, tpDir.toString)
+    val tpIdForEstimate = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+    val metadataListForEstimate = generateRemoteLogSegmentMetadata(size = 1, 
tpIdForEstimate)
+    val entryForEstimate = 
cacheForEstimate.getIndexEntry(metadataListForEstimate.head)
+    val estimateEntryBytesSize = 
RemoteIndexCache.estimatedEntrySize(entryForEstimate)
+    Utils.closeQuietly(cacheForEstimate, "RemoteIndexCache created for 
estimating entry size")

Review Comment:
   do we need to do all this? Instead can't we initialize with a big enough 
size that works for this test case?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -395,7 +418,11 @@ private RemoteIndexCache.Entry 
createCacheEntry(RemoteLogSegmentMetadata remoteL
                 }
             });
 
-            return new Entry(offsetIndex, timeIndex, txnIndex);
+            Entry entry = new Entry(offsetIndex, timeIndex, txnIndex);
+            int entrySize = estimatedEntrySize(entry);
+            entry.setEntrySize(entrySize);

Review Comment:
   instead of exposing a public function to set the entry size, perhaps it will 
be better to keep it private and calculate in constructor. Asking because there 
is no use case where an entity outside `Entry` will ever possibly want to set 
entry size based on it's own calculation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to