This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f6f41dc5ebc KAFKA-17631 Convert SaslApiVersionsRequestTest to kraft 
(#18330)
f6f41dc5ebc is described below

commit f6f41dc5ebc0933210c90a520258f89900d67857
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Feb 3 21:01:38 2025 +0800

    KAFKA-17631 Convert SaslApiVersionsRequestTest to kraft (#18330)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/SaslApiVersionsRequestTest.scala  | 84 +++++++---------------
 1 file changed, 26 insertions(+), 58 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 3a8042e2590..7f1f8b6aae3 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -16,81 +16,48 @@
   */
 package kafka.server
 
-import kafka.api.SaslSetup
-import kafka.security.JaasTestUtils
-import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, 
kafkaServerSaslMechanisms}
-import org.apache.kafka.common.test.api.{ClusterConfig, ClusterTemplate, Type}
-import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.test.api.{ClusterTest, Type}
 import org.apache.kafka.common.test.ClusterInstance
-import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.common.test.junit.ClusterTestExtensions
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}
+import org.junit.jupiter.api.extension.ExtendWith
 
 import java.net.Socket
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
-object SaslApiVersionsRequestTest {
-  val kafkaClientSaslMechanism = "PLAIN"
-  val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
-  val controlPlaneListenerName = "CONTROL_PLANE"
-  val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-
-  def saslApiVersionsRequestClusterConfig(): java.util.List[ClusterConfig] = {
-    val saslServerProperties = new java.util.HashMap[String, String]()
-    
saslServerProperties.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
 kafkaClientSaslMechanism)
-    
saslServerProperties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
kafkaServerSaslMechanisms.mkString(","))
-
-    val saslClientProperties = new java.util.HashMap[String, String]()
-    saslClientProperties.put(SaslConfigs.SASL_MECHANISM, 
kafkaClientSaslMechanism)
-
-    // Configure control plane listener to make sure we have separate 
listeners for testing.
-    val serverProperties =  new java.util.HashMap[String, String]()
-    
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
-    serverProperties.put("listeners", 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
-    serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
-
-    List(ClusterConfig.defaultBuilder
-      .setBrokerSecurityProtocol(securityProtocol)
-      .setTypes(Set(Type.KRAFT).asJava)
-      .setSaslServerProperties(saslServerProperties)
-      .setSaslClientProperties(saslClientProperties)
-      .setServerProperties(serverProperties)
-      .build()).asJava
-  }
-}
-
-@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
-  private var sasl: SaslSetup = _
-
-  @BeforeEach
-  def setupSasl(): Unit = {
-    sasl = new SaslSetup() {}
-    sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
-  }
 
-  @ClusterTemplate("saslApiVersionsRequestClusterConfig")
+  @ClusterTest(types = Array(Type.KRAFT),
+    brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+    controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  )
   def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
     val socket = 
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, 
cluster.clientListener())
     try {
       val apiVersionsResponse = 
IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
-      validateApiVersionsResponse(apiVersionsResponse,
+      validateApiVersionsResponse(
+        apiVersionsResponse,
         enableUnstableLastVersion = !"false".equals(
-          
cluster.config().serverProperties().get("unstable.api.versions.enable")))
+          
cluster.config().serverProperties().get("unstable.api.versions.enable")),
+        apiVersion = 0.toShort
+      )
       sendSaslHandshakeRequestValidateResponse(socket)
     } finally {
       socket.close()
     }
   }
 
-  @ClusterTemplate("saslApiVersionsRequestClusterConfig")
+  @ClusterTest(types = Array(Type.KRAFT),
+    brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+    controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  )
   def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
     val socket = 
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, 
cluster.clientListener())
     try {
@@ -103,7 +70,10 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVe
     }
   }
 
-  @ClusterTemplate("saslApiVersionsRequestClusterConfig")
+  @ClusterTest(types = Array(Type.KRAFT),
+    brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+    controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
+  )
   def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
     val socket = 
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, 
cluster.clientListener())
     try {
@@ -112,20 +82,18 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVe
       assertEquals(Errors.UNSUPPORTED_VERSION.code, 
apiVersionsResponse.data.errorCode)
       val apiVersionsResponse2 = 
IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
-      validateApiVersionsResponse(apiVersionsResponse2,
+      validateApiVersionsResponse(
+        apiVersionsResponse2,
         enableUnstableLastVersion = !"false".equals(
-          
cluster.config().serverProperties().get("unstable.api.versions.enable")))
+          
cluster.config().serverProperties().get("unstable.api.versions.enable")),
+        apiVersion = 0.toShort
+      )
       sendSaslHandshakeRequestValidateResponse(socket)
     } finally {
       socket.close()
     }
   }
 
-  @AfterEach
-  def closeSasl(): Unit = {
-    sasl.closeSasl()
-  }
-
   private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit = 
{
     val request = new SaslHandshakeRequest(new 
SaslHandshakeRequestData().setMechanism("PLAIN"),
       ApiKeys.SASL_HANDSHAKE.latestVersion)

Reply via email to