hudeqi commented on code in PR #14243:
URL: https://github.com/apache/kafka/pull/14243#discussion_r1324144439


##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -1129,3 +1132,50 @@ class DynamicProducerStateManagerConfig(val 
producerStateManagerConfig: Producer
   override def reconfigurableConfigs: Set[String] = 
ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala
 
 }
+
+class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicRemoteLogConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    newConfig.values.forEach { (k, v) =>
+      if (DynamicRemoteLogConfigs.contains(k)) {
+        val newValue = v.asInstanceOf[Long]
+        val oldValue = getValue(server.config, k)
+        if (newValue != oldValue) {
+          val errorMsg = s"Dynamic remote log manager config update validation 
failed for $k=$v"
+          if (newValue <= 0)
+            throw new ConfigException(s"$errorMsg, value should be at least 1")
+        }
+      }
+    }
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+    val oldValue = 
oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+    val newValue = 
newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+    if (oldValue != newValue) {
+      val remoteLogManager = server.remoteLogManager
+      if (remoteLogManager.nonEmpty) {
+        remoteLogManager.get.resizeCacheSize(newValue)
+        info(s"Dynamic remote log manager config: 
${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} 
updated, " +
+          s"old value: $oldValue, new value: $newValue")
+      }
+    }
+  }
+
+  private def getValue(config: KafkaConfig, name: String): Long = {
+    name match {
+      case 
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
+        
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
+      case n => throw new IllegalStateException(s"Unexpected dynamic remote 
log manager config $n")
+    }
+  }
+}
+
+object DynamicRemoteLogConfig {
+  val DynamicRemoteLogConfigs = Set(

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to