kamalcph commented on code in PR #14243:
URL: https://github.com/apache/kafka/pull/14243#discussion_r1323939601


##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -700,4 +700,5 @@ class BrokerServer(
 
   override def boundPort(listenerName: ListenerName): Int = 
socketServer.boundPort(listenerName)
 
+  override def remoteLogManager: Option[RemoteLogManager] = remoteLogManagerOpt

Review Comment:
   Why do we need this method?
   
   trait `KafkaBroker` already have 
[remoteLogManagerOpt](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaBroker.scala#L84)
 method.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -112,27 +115,54 @@ public class RemoteIndexCache implements Closeable {
      *
      * We use {@link Caffeine} cache instead of implementing a thread safe LRU 
cache on our own.
      */
-    private final Cache<Uuid, Entry> internalCache;
-    private final RemoteStorageManager remoteStorageManager;
-    private final ShutdownableThread cleanerThread;
+    private Cache<Uuid, Entry> internalCache;
 
     public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String 
logDir) throws IOException {

Review Comment:
   Can we remove this constructor? (it's used only by the test)



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -129,7 +129,7 @@ class KafkaServer(
 
   var logDirFailureChannel: LogDirFailureChannel = _
   @volatile private var _logManager: LogManager = _
-  var remoteLogManagerOpt: Option[RemoteLogManager] = None
+  @volatile var remoteLogManagerOpt: Option[RemoteLogManager] = None

Review Comment:
   Why do we have to mark this as `volatile`?



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -89,7 +90,8 @@ object DynamicBrokerConfig {
     Set(KafkaConfig.MetricReporterClassesProp) ++
     DynamicListenerConfig.ReconfigurableConfigs ++
     SocketServer.ReconfigurableConfigs ++
-    ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
+    ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala ++
+    DynamicRemoteLogConfigs

Review Comment:
   Can we remove the static import and make the usage uniform for readability?
   
   ```
   DynamicRemoteLogConfig.ReconfigurableConfigs
   ```



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -1129,3 +1132,50 @@ class DynamicProducerStateManagerConfig(val 
producerStateManagerConfig: Producer
   override def reconfigurableConfigs: Set[String] = 
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
 
 }
+
+class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicRemoteLogConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    newConfig.values.forEach { (k, v) =>
+      if (DynamicRemoteLogConfigs.contains(k)) {
+        val newValue = v.asInstanceOf[Long]
+        val oldValue = getValue(server.config, k)
+        if (newValue != oldValue) {
+          val errorMsg = s"Dynamic remote log manager config update validation 
failed for $k=$v"
+          if (newValue <= 0)
+            throw new ConfigException(s"$errorMsg, value should be at least 1")
+        }
+      }
+    }
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+    val oldValue = 
oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+    val newValue = 
newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+    if (oldValue != newValue) {
+      val remoteLogManager = server.remoteLogManager
+      if (remoteLogManager.nonEmpty) {
+        remoteLogManager.get.resizeCacheSize(newValue)
+        info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} 
updated, " +
+          s"old value: $oldValue, new value: $newValue")
+      }
+    }
+  }
+
+  private def getValue(config: KafkaConfig, name: String): Long = {
+    name match {
+      case 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
+        
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+      case n => throw new IllegalStateException(s"Unexpected dynamic remote 
log manager config $n")
+    }
+  }
+}
+
+object DynamicRemoteLogConfig {
+  val DynamicRemoteLogConfigs = Set(

Review Comment:
   Can we follow uniform naming strategy for readability?
   
   Rename `DynamicRemoteLogConfigs` -> `ReconfigurableConfigs`



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -230,7 +231,7 @@ class RemoteIndexCacheTest {
     // close existing cache created in test setup before creating a new one
     Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
 
-    cache = new RemoteIndexCache(2, rsm, tpDir.toString)
+    cache = new RemoteIndexCache(2L, rsm, tpDir.toString)

Review Comment:
   Why this is `2L` but not `2 * estimateEntryBytesSize`?



##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##########
@@ -197,7 +197,7 @@ public final class RemoteLogManagerConfig {
                                   atLeast(0),
                                   LOW,
                                   
REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
-                  
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+                  .define(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
                                   LONG,
                                   
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,

Review Comment:
   `DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES` is still referring to 
1 GB. 
   
   What is the difference between `DEFAULT_REMOTE_INDEX_CACHE_SIZE_BYTES` and 
`DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES` configs?



##########
core/src/main/scala/kafka/server/KafkaBroker.scala:
##########
@@ -95,6 +95,7 @@ trait KafkaBroker extends Logging {
   def brokerTopicStats: BrokerTopicStats
   def credentialProvider: CredentialProvider
   def clientToControllerChannelManager: NodeToControllerChannelManager
+  def remoteLogManager: Option[RemoteLogManager]

Review Comment:
   this is not required, please reuse `remoteLogManagerOpt` method.



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -524,6 +526,50 @@ class RemoteIndexCacheTest {
     }
   }
 
+  @Test
+  def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+    def getIndexFileFromRemoteCacheDir(suffix: String) = {
+      Files.walk(cache.cacheDir())
+        .filter(Files.isRegularFile(_))
+        .filter(path => path.getFileName.toString.endsWith(suffix))
+        .findAny()
+    }
+
+    Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
+    cache = new RemoteIndexCache(rsm, tpDir.toString)

Review Comment:
   Why do we reinitialise the cache?



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -594,4 +575,16 @@ class RemoteIndexCacheTest {
       timeIndex.flush()
     }
   }
+
+  private def getEstimateEntryBytesSize(): Long = {
+    val cacheForEstimate = new RemoteIndexCache(2L, rsm, tpDir.toString)
+    val tpIdForEstimate = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+    val metadataListForEstimate = generateRemoteLogSegmentMetadata(size = 1, 
tpIdForEstimate)
+    val entryForEstimate = 
cacheForEstimate.getIndexEntry(metadataListForEstimate.head)
+    val estimateEntryBytesSize = entryForEstimate.entrySizeBytes()
+    Utils.closeQuietly(cacheForEstimate, "RemoteIndexCache created for 
estimating entry size")
+    cleanup()
+    setup()

Review Comment:
   My preference is to have a different partition directory to avoid using the 
`setUp` and `cleanup` methods which are common to all the tests inside the test:
   
   ``` scala
   private def estimateOneEntryBytesSize(): Long = {
       val tp = new TopicPartition("estimate-entry-bytes-size", 0)
       val tpId = new TopicIdPartition(Uuid.randomUuid(), tp)
       val tpDir = new File(logDir, tpId.toString)
       Files.createDirectory(tpDir.toPath)
       val rsm = mock(classOf[RemoteStorageManager])
       mockRsmFetchIndex(rsm)
       val cache = new RemoteIndexCache(2L, rsm, tpDir.toString)
       val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
       val entry = cache.getIndexEntry(metadataList.head)
       val estimateEntryBytesSize = entry.entrySizeBytes()
       Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry 
size")
       estimateEntryBytesSize
   }
   
   // We can reuse this method in the setup()
   private def mockRsmFetchIndex(rsm: RemoteStorageManager): Unit = {
     when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
       .thenAnswer(ans => {
         val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
         val indexType = ans.getArgument[IndexType](1)
         val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
         val timeIdx = createTimeIndexForSegmentMetadata(metadata)
         val txnIdx = createTxIndexForSegmentMetadata(metadata)
         maybeAppendIndexEntries(offsetIdx, timeIdx)
         indexType match {
           case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
           case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
           case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
           case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
           case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
         }
       })
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to