dengziming commented on code in PR #14175:
URL: https://github.com/apache/kafka/pull/14175#discussion_r1288503732
##########
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##########
@@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends
KafkaServerTestHarness {
assertTrue(lineIter.hasNext)
assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (",
lineIter.next())
val nodeApiVersions = NodeApiVersions.create
Review Comment:
We should change how we construct `NodeApiVersions` here, such as `val
nodeApiVersions = new
NodeApiVersions(clientApis.map(ApiVersionsResponse.toApiVersion).asJava,
Collections.emptyList(), false)`
##########
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##########
@@ -17,37 +17,47 @@
package kafka.admin
-import java.io.{ByteArrayOutputStream, PrintStream}
-import java.nio.charset.StandardCharsets
-import scala.collection.Seq
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.NodeApiVersions
+import org.apache.kafka.common.message.ApiMessageType
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertTrue}
-import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+import scala.collection.Seq
import scala.jdk.CollectionConverters._
class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
def generateConfigs: Seq[KafkaConfig] =
- TestUtils.createBrokerConfigs(1, zkConnect).map(props => {
- // Configure control plane listener to make sure we have separate
listeners from client,
- // in order to avoid returning Envelope API version.
- props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
- props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
- props.setProperty("listeners",
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
- props.setProperty(KafkaConfig.AdvertisedListenersProp,
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
- props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
- props
- }).map(KafkaConfig.fromProps)
+ if (isKRaftTest()) {
+ TestUtils.createBrokerConfigs(1, null).map(props => {
+ props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
Review Comment:
Please add a comment about this change, I guess this is related to KIP-848,
and we can remove this after KIP-848 is all set.
##########
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##########
@@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends
KafkaServerTestHarness {
assertTrue(lineIter.hasNext)
assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (",
lineIter.next())
val nodeApiVersions = NodeApiVersions.create
- val enabledApis = ApiKeys.zkBrokerApis.asScala
- for (apiKey <- enabledApis) {
- val apiVersion = nodeApiVersions.apiVersion(apiKey)
- assertNotNull(apiVersion)
+ val listenerType = if (isKRaftTest()) {
+ ApiMessageType.ListenerType.BROKER
+ } else {
+ ApiMessageType.ListenerType.ZK_BROKER
+ }
+ val clientApis = ApiKeys.clientApis().asScala
+ for (apiKey <- clientApis) {
+ assertTrue(lineIter.hasNext)
+ val actual = lineIter.next()
+ if (apiKey.inScope(listenerType)) {
+ val apiVersion = nodeApiVersions.apiVersion(apiKey)
+ assertNotNull(apiVersion)
- val versionRangeStr =
- if (apiVersion.minVersion == apiVersion.maxVersion)
apiVersion.minVersion.toString
- else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
- val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
+ val versionRangeStr =
+ if (apiVersion.minVersion == apiVersion.maxVersion)
apiVersion.minVersion.toString
+ else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
+ val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
- val terminator = if (apiKey == enabledApis.last) "" else ","
+ val terminator = if (apiKey == clientApis.last) "" else ","
- val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable:
$usableVersion]$terminator"
- assertTrue(lineIter.hasNext)
- assertEquals(line, lineIter.next())
+ val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable:
$usableVersion]$terminator"
+ assertTrue(lineIter.hasNext)
+ assertEquals(line, actual)
+ }
Review Comment:
We should add a else branch to do similar assertions, such as
```
val line = s"\t${apiKey.name}(${apiKey.id}): UNSUPPORTED,"
assertTrue(lineIter.hasNext)
assertEquals(line, actual)
```
##########
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java:
##########
@@ -68,7 +68,7 @@ public static NodeApiVersions create() {
*/
public static NodeApiVersions create(Collection<ApiVersion> overrides) {
List<ApiVersion> apiVersions = new LinkedList<>(overrides);
- for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
+ for (ApiKeys apiKey : ApiKeys.clientApis()) {
Review Comment:
This method is used in too many places so we should be careful about this
change, we'd better keep it unchanged in this PR, and do this change in another
patch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]