hudeqi commented on code in PR #14243: URL: https://github.com/apache/kafka/pull/14243#discussion_r1324144439
########## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ########## @@ -1129,3 +1132,50 @@ class DynamicProducerStateManagerConfig(val producerStateManagerConfig: Producer override def reconfigurableConfigs: Set[String] = ProducerStateManagerConfig.RECONFIGURABLE_CONFIGS.asScala } + +class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable with Logging { + override def reconfigurableConfigs: Set[String] = { + DynamicRemoteLogConfigs + } + + override def validateReconfiguration(newConfig: KafkaConfig): Unit = { + newConfig.values.forEach { (k, v) => + if (DynamicRemoteLogConfigs.contains(k)) { + val newValue = v.asInstanceOf[Long] + val oldValue = getValue(server.config, k) + if (newValue != oldValue) { + val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v" + if (newValue <= 0) + throw new ConfigException(s"$errorMsg, value should be at least 1") + } + } + } + } + + override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { + val oldValue = oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + val newValue = newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + if (oldValue != newValue) { + val remoteLogManager = server.remoteLogManager + if (remoteLogManager.nonEmpty) { + remoteLogManager.get.resizeCacheSize(newValue) + info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " + + s"old value: $oldValue, new value: $newValue") + } + } + } + + private def getValue(config: KafkaConfig, name: String): Long = { + name match { + case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP => + config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP) + case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n") + } + } +} + +object DynamicRemoteLogConfig { + val DynamicRemoteLogConfigs = Set( Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org