This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f6f41dc5ebc KAFKA-17631 Convert SaslApiVersionsRequestTest to kraft
(#18330)
f6f41dc5ebc is described below
commit f6f41dc5ebc0933210c90a520258f89900d67857
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Feb 3 21:01:38 2025 +0800
KAFKA-17631 Convert SaslApiVersionsRequestTest to kraft (#18330)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/server/SaslApiVersionsRequestTest.scala | 84 +++++++---------------
1 file changed, 26 insertions(+), 58 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 3a8042e2590..7f1f8b6aae3 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -16,81 +16,48 @@
*/
package kafka.server
-import kafka.api.SaslSetup
-import kafka.security.JaasTestUtils
-import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism,
kafkaServerSaslMechanisms}
-import org.apache.kafka.common.test.api.{ClusterConfig, ClusterTemplate, Type}
-import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiVersionsRequest,
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.test.api.{ClusterTest, Type}
import org.apache.kafka.common.test.ClusterInstance
-import org.apache.kafka.network.SocketServerConfigs
+import org.apache.kafka.common.test.junit.ClusterTestExtensions
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled}
+import org.junit.jupiter.api.extension.ExtendWith
import java.net.Socket
import java.util.Collections
import scala.jdk.CollectionConverters._
-object SaslApiVersionsRequestTest {
- val kafkaClientSaslMechanism = "PLAIN"
- val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
- val controlPlaneListenerName = "CONTROL_PLANE"
- val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
-
- def saslApiVersionsRequestClusterConfig(): java.util.List[ClusterConfig] = {
- val saslServerProperties = new java.util.HashMap[String, String]()
-
saslServerProperties.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
kafkaClientSaslMechanism)
-
saslServerProperties.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
kafkaServerSaslMechanisms.mkString(","))
-
- val saslClientProperties = new java.util.HashMap[String, String]()
- saslClientProperties.put(SaslConfigs.SASL_MECHANISM,
kafkaClientSaslMechanism)
-
- // Configure control plane listener to make sure we have separate
listeners for testing.
- val serverProperties = new java.util.HashMap[String, String]()
-
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
- serverProperties.put("listeners",
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
- serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
-
- List(ClusterConfig.defaultBuilder
- .setBrokerSecurityProtocol(securityProtocol)
- .setTypes(Set(Type.KRAFT).asJava)
- .setSaslServerProperties(saslServerProperties)
- .setSaslClientProperties(saslClientProperties)
- .setServerProperties(serverProperties)
- .build()).asJava
- }
-}
-
-@Disabled("TODO: KAFKA-17631 - Convert SaslApiVersionsRequestTest to kraft")
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends
AbstractApiVersionsRequestTest(cluster) {
- private var sasl: SaslSetup = _
-
- @BeforeEach
- def setupSasl(): Unit = {
- sasl = new SaslSetup() {}
- sasl.startSasl(sasl.jaasSections(kafkaServerSaslMechanisms,
Some(kafkaClientSaslMechanism), JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
- }
- @ClusterTemplate("saslApiVersionsRequestClusterConfig")
+ @ClusterTest(types = Array(Type.KRAFT),
+ brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+ controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ )
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
val socket =
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head,
cluster.clientListener())
try {
val apiVersionsResponse =
IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
- validateApiVersionsResponse(apiVersionsResponse,
+ validateApiVersionsResponse(
+ apiVersionsResponse,
enableUnstableLastVersion = !"false".equals(
-
cluster.config().serverProperties().get("unstable.api.versions.enable")))
+
cluster.config().serverProperties().get("unstable.api.versions.enable")),
+ apiVersion = 0.toShort
+ )
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}
- @ClusterTemplate("saslApiVersionsRequestClusterConfig")
+ @ClusterTest(types = Array(Type.KRAFT),
+ brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+ controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ )
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
val socket =
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head,
cluster.clientListener())
try {
@@ -103,7 +70,10 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVe
}
}
- @ClusterTemplate("saslApiVersionsRequestClusterConfig")
+ @ClusterTest(types = Array(Type.KRAFT),
+ brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
+ controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
+ )
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
val socket =
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head,
cluster.clientListener())
try {
@@ -112,20 +82,18 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVe
assertEquals(Errors.UNSUPPORTED_VERSION.code,
apiVersionsResponse.data.errorCode)
val apiVersionsResponse2 =
IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
- validateApiVersionsResponse(apiVersionsResponse2,
+ validateApiVersionsResponse(
+ apiVersionsResponse2,
enableUnstableLastVersion = !"false".equals(
-
cluster.config().serverProperties().get("unstable.api.versions.enable")))
+
cluster.config().serverProperties().get("unstable.api.versions.enable")),
+ apiVersion = 0.toShort
+ )
sendSaslHandshakeRequestValidateResponse(socket)
} finally {
socket.close()
}
}
- @AfterEach
- def closeSasl(): Unit = {
- sasl.closeSasl()
- }
-
private def sendSaslHandshakeRequestValidateResponse(socket: Socket): Unit =
{
val request = new SaslHandshakeRequest(new
SaslHandshakeRequestData().setMechanism("PLAIN"),
ApiKeys.SASL_HANDSHAKE.latestVersion)