dengziming commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1226040921
##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -141,24 +136,22 @@ class DefaultApiVersionManager(
val enabledApis = ApiKeys.apisForListener(listenerType).asScala
override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
- val supportedFeatures = features.supportedFeatures
+ val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeatures = metadataCache.features()
val controllerApiVersions =
forwardingManager.flatMap(_.controllerApiVersions)
ApiVersionsResponse.createApiVersionsResponse(
throttleTimeMs,
- metadataCache.metadataVersion().highestSupportedRecordVersion,
+ finalizedFeatures.metadataVersion().highestSupportedRecordVersion,
supportedFeatures,
- finalizedFeatures.features.map(kv => (kv._1,
kv._2.asInstanceOf[java.lang.Short])).asJava,
- finalizedFeatures.epoch,
+ finalizedFeatures.finalizedFeatures(),
+ finalizedFeatures.finalizedFeaturesEpoch(),
controllerApiVersions.orNull,
listenerType,
enableUnstableLastVersion,
zkMigrationEnabled
)
}
- override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures:
Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse
= {
- throw new UnsupportedOperationException("This method is not supported in
DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")
- }
+ override def features: Features = metadataCache.features()
Review Comment:
It seems this method is not used.
##########
core/src/main/scala/kafka/server/ControllerServer.scala:
##########
@@ -326,6 +329,9 @@ class ControllerServer(
// register this instance for dynamic config changes to the KafkaConfig
config.dynamicConfig.addReconfigurables(this)
+ // Set up the metadata version publisher.
Review Comment:
nit: metadata version publisher -> feature publisher
##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -132,7 +127,7 @@ class SimpleApiVersionManager(
class DefaultApiVersionManager(
val listenerType: ListenerType,
forwardingManager: Option[ForwardingManager],
- features: BrokerFeatures,
+ brokerFeatures: BrokerFeatures,
Review Comment:
We should update the class docs to make it consistent with the field name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]