dajac commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r893692868
##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -73,18 +73,35 @@ class DefaultApiVersionManager(
) extends ApiVersionManager {
override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
+ val metadataVersion = metadataCache.metadataVersion()
val supportedFeatures = features.supportedFeatures
val finalizedFeatures = metadataCache.features()
val controllerApiVersions =
forwardingManager.flatMap(_.controllerApiVersions)
- ApiVersionsResponse.createApiVersionsResponse(
+ val response = ApiVersionsResponse.createApiVersionsResponse(
throttleTimeMs,
- metadataCache.metadataVersion().highestSupportedRecordVersion,
+ metadataVersion.highestSupportedRecordVersion,
supportedFeatures,
finalizedFeatures.features.map(kv => (kv._1,
kv._2.asInstanceOf[java.lang.Short])).asJava,
finalizedFeatures.epoch,
controllerApiVersions.orNull,
- listenerType)
+ listenerType
+ )
+
+ // In ZK mode if the deployed software of the controller uses version 2.8
or above
+ // but the IBP is below 2.8, the controller does not assign topic ids. In
this case,
+ // it should not advertise the AlterPartition API version 2 and above.
+ val alterPartitionApiVersion =
response.apiVersion(ApiKeys.ALTER_PARTITION.id)
+ if (alterPartitionApiVersion != null) {
+ alterPartitionApiVersion.setMaxVersion(
+ if (metadataVersion.isTopicIdsSupported)
+ alterPartitionApiVersion.maxVersion()
+ else
+ 1.toShort
+ )
+ }
Review Comment:
I have filed a JIRA for tracking this:
https://issues.apache.org/jira/browse/KAFKA-13975. It seems to me that it would
be better to do it separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]