[
https://issues.apache.org/jira/browse/KAFKA-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson resolved KAFKA-13417.
-------------------------------------
Resolution: Fixed
> Dynamic thread pool re-configurations may not get processed
> -----------------------------------------------------------
>
> Key: KAFKA-13417
> URL: https://issues.apache.org/jira/browse/KAFKA-13417
> Project: Kafka
> Issue Type: Bug
> Reporter: Jason Gustafson
> Assignee: Jason Gustafson
> Priority: Major
>
> `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to
> update the current configuration and to let each `Reconfigurable` process the
> update:
> {code}
> val oldConfig = currentConfig
> val (newConfig, brokerReconfigurablesToUpdate) =
> processReconfiguration(newProps, validateOnly = false)
> if (newConfig ne currentConfig) {
> currentConfig = newConfig
> kafkaConfig.updateCurrentConfig(newConfig)
> // Process BrokerReconfigurable updates after current config is updated
> brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig,
> newConfig))
> }
> {code}
> The problem here is that `currentConfig` gets initialized as `kafkaConfig`
> which means that the first call to
> `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig`
> and consequently `oldConfig`. The problem with this is that some of the
> `reconfigure` implementations will only apply a new configuration if the
> value in `oldConfig` does not match the value in `newConfig`. For example,
> here is the logic to update thread pools dynamically:
> {code}
> override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
> Unit = {
> if (newConfig.numIoThreads != oldConfig.numIoThreads)
>
> server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
> if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
> server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads,
> newConfig.numNetworkThreads)
> if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
>
> server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers)
> if (newConfig.numRecoveryThreadsPerDataDir !=
> oldConfig.numRecoveryThreadsPerDataDir)
>
> server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
> if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
> server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
> }
> {code}
> Because of this, the dynamic update will not get applied the first time it is
> made. I believe subsequent updates would work correctly though because we
> would have lost the indirect reference to `kafkaConfig`. Other than the
> `DynamicThreadPool` configurations, it looks like the config to update
> unclean leader election may also be affected by this bug.
> NOTE: This bug only affects kraft, which is missing the call to
> `DynamicBrokerConfig.initialize()`.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)