This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 46b474a9dee KAFKA-19239 Rewrite IntegrationTestUtils by java (#19776)
46b474a9dee is described below

commit 46b474a9dee4fc0c56ededf4adf44704372de79b
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Fri Jun 20 01:46:29 2025 +0800

    KAFKA-19239 Rewrite IntegrationTestUtils by java (#19776)
    
    This PR rewrites the IntegrationTestUtils.java from Scala to Java.
    
    ## Changes:
    
    - Converted all the existing Scala code in IntegrationTestUtils.scala
    into Java in IntegrationTestUtils.java.
    - Preserved the original logic and functionality to ensure backward
    compatibility.
    - Updated relevant imports and dependencies accordingly.
    
    Motivation:
    
    The rewrite aims to standardize the codebase in Java, which aligns
    better with the rest of the project and facilitates easier maintenance
    by the Java-centric team.
    
    Reviewers: TaiJuWu <[email protected]>, Ken Huang <[email protected]>,
     PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../transaction/ProducerIntegrationTest.scala      |   9 +-
 .../kafka/server/IntegrationTestUtils.scala        | 106 ---------------------
 .../kafka/server/KRaftClusterTest.scala            |  12 +--
 .../server/AbstractApiVersionsRequestTest.scala    |   5 +-
 .../server/AllocateProducerIdsRequestTest.scala    |   7 +-
 .../kafka/server/ClientQuotasRequestTest.scala     |  14 +--
 .../kafka/server/DescribeQuorumRequestTest.scala   |  19 +---
 .../server/GroupCoordinatorBaseRequestTest.scala   |  31 ++----
 .../kafka/server/SaslApiVersionsRequestTest.scala  |   8 +-
 .../server/ShareFetchAcknowledgeRequestTest.scala  |   1 +
 .../server/ShareGroupHeartbeatRequestTest.scala    |   8 +-
 .../apache/kafka/server/IntegrationTestUtils.java  |  84 ++++++++++++++++
 .../apache/kafka/common/test/ClusterInstance.java  |   7 ++
 13 files changed, 133 insertions(+), 178 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
index d2be817e26d..75b1bba0e12 100644
--- 
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
@@ -18,7 +18,7 @@
 package kafka.coordinator.transaction
 
 import kafka.network.SocketServer
-import kafka.server.IntegrationTestUtils
+import org.apache.kafka.server.IntegrationTestUtils
 import org.apache.kafka.clients.admin.{Admin, NewTopic, TransactionState}
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
ConsumerRecords, OffsetAndMetadata}
 import org.apache.kafka.clients.producer.{Producer, ProducerConfig, 
ProducerRecord}
@@ -206,11 +206,8 @@ class ProducerIntegrationTest {
         .setTransactionalId(null)
         .setTransactionTimeoutMs(10)
       val request = new InitProducerIdRequest.Builder(data).build()
-
-      response = 
IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request,
-        destination = broker,
-        listenerName = listener)
-
+      val port = broker.boundPort(listener)
+      response = 
IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, port)
       shouldRetry = response.data.errorCode == 
Errors.COORDINATOR_LOAD_IN_PROGRESS.code
     }
     assertTrue(deadline.hasTimeLeft())
diff --git 
a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala 
b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
deleted file mode 100644
index a6ac36ab2ae..00000000000
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.network.SocketServer
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiKeys, ByteBufferAccessor}
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
RequestHeader, ResponseHeader}
-import org.apache.kafka.common.utils.Utils
-
-import java.io.{DataInputStream, DataOutputStream}
-import java.net.Socket
-import java.nio.ByteBuffer
-import scala.reflect.ClassTag
-
-object IntegrationTestUtils {
-
-  def sendRequest(socket: Socket, request: Array[Byte]): Unit = {
-    val outgoing = new DataOutputStream(socket.getOutputStream)
-    outgoing.writeInt(request.length)
-    outgoing.write(request)
-    outgoing.flush()
-  }
-
-  private def sendWithHeader(request: AbstractRequest, header: RequestHeader, 
socket: Socket): Unit = {
-    val serializedBytes = Utils.toArray(request.serializeWithHeader(header))
-    sendRequest(socket, serializedBytes)
-  }
-
-  def nextRequestHeader[T <: AbstractResponse](apiKey: ApiKeys,
-                                               apiVersion: Short,
-                                               clientId: String = "client-id",
-                                               correlationIdOpt: Option[Int] = 
None): RequestHeader = {
-    val correlationId = correlationIdOpt.getOrElse {
-      this.correlationId += 1
-      this.correlationId
-    }
-    new RequestHeader(apiKey, apiVersion, clientId, correlationId)
-  }
-
-  def send(request: AbstractRequest,
-           socket: Socket,
-           clientId: String = "client-id",
-           correlationId: Option[Int] = None): Unit = {
-    val header = nextRequestHeader(request.apiKey, request.version, clientId, 
correlationId)
-    sendWithHeader(request, header, socket)
-  }
-
-  def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version: 
Short)
-                                    (implicit classTag: ClassTag[T]): T = {
-    val incoming = new DataInputStream(socket.getInputStream)
-    val len = incoming.readInt()
-
-    val responseBytes = new Array[Byte](len)
-    incoming.readFully(responseBytes)
-
-    val responseBuffer = ByteBuffer.wrap(responseBytes)
-    ResponseHeader.parse(responseBuffer, apiKey.responseHeaderVersion(version))
-
-    AbstractResponse.parseResponse(apiKey, new 
ByteBufferAccessor(responseBuffer), version) match {
-      case response: T => response
-      case response =>
-        throw new ClassCastException(s"Expected response with type 
${classTag.runtimeClass}, but found ${response.getClass}")
-    }
-  }
-
-  def sendAndReceive[T <: AbstractResponse](request: AbstractRequest,
-                                            socket: Socket,
-                                            clientId: String = "client-id",
-                                            correlationId: Option[Int] = None)
-                                           (implicit classTag: ClassTag[T]): T 
= {
-    send(request, socket, clientId, correlationId)
-    receive[T](socket, request.apiKey, request.version)
-  }
-
-  def connectAndReceive[T <: AbstractResponse](request: AbstractRequest,
-                                               destination: SocketServer,
-                                               listenerName: ListenerName)
-                                              (implicit classTag: 
ClassTag[T]): T = {
-    val socket = connect(destination, listenerName)
-    try sendAndReceive[T](request, socket)
-    finally socket.close()
-  }
-
-  private var correlationId = 0
-
-  def connect(socketServer: SocketServer,
-              listenerName: ListenerName): Socket = {
-    new Socket("localhost", socketServer.boundPort(listenerName))
-  }
-}
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index ab44b0c7a0a..55911c1e796 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -18,8 +18,8 @@
 package kafka.server
 
 import kafka.network.SocketServer
-import kafka.server.IntegrationTestUtils.connectAndReceive
 import kafka.utils.TestUtils
+import org.apache.kafka.server.IntegrationTestUtils.connectAndReceive
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
@@ -518,12 +518,10 @@ class KRaftClusterTest {
   }
 
   private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
-                                                    listenerName: 
ListenerName): DescribeClusterResponse =
-    connectAndReceive[DescribeClusterResponse](
-      request = new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()).build(),
-      destination = destination,
-      listenerName = listenerName
-    )
+                                                    listenerName: 
ListenerName): DescribeClusterResponse = {
+    connectAndReceive[DescribeClusterResponse](new 
DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
+      destination.boundPort(listenerName))
+  }
 
   @Test
   def testCreateClusterAndPerformReassignment(): Unit = {
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 704d810137f..bd587cda2a0 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, RequestUtils}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.IntegrationTestUtils
 import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, 
GroupVersion, MetadataVersion, ShareVersion, StreamsVersion, TransactionVersion}
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
@@ -41,12 +42,12 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     } else {
       cluster.brokerSocketServers().asScala.head
     }
-    IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, 
socket, listenerName)
+    IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, 
socket.boundPort(listenerName))
   }
 
   def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): 
ApiVersionsResponse = {
     val overrideHeader = 
IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
-    val socket = 
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, 
cluster.clientListener())
+    val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
     try {
       val serializedBytes = Utils.toArray(
         RequestUtils.serialize(overrideHeader.data, 
overrideHeader.headerVersion, request.data, request.version))
diff --git 
a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
index cb44719fad7..aa399985f83 100644
--- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -17,12 +17,13 @@
 package unit.kafka.server
 
 import kafka.network.SocketServer
-import kafka.server.{BrokerServer, ControllerServer, IntegrationTestUtils}
+import kafka.server.{BrokerServer, ControllerServer}
 import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults, 
Type}
 import org.apache.kafka.common.message.AllocateProducerIdsRequestData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.server.IntegrationTestUtils
 import org.apache.kafka.server.common.ProducerIdsBlock
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 
@@ -81,9 +82,7 @@ class AllocateProducerIdsRequestTest(cluster: 
ClusterInstance) {
   ): AllocateProducerIdsResponse = {
     IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
       request,
-      controllerSocketServer,
-      cluster.controllerListenerName
+      controllerSocketServer.boundPort(cluster.controllerListenerName())
     )
   }
-
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index 8c30f749427..2e962f09dc8 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.requests.{AlterClientQuotasRequest, 
AlterClientQuotasResponse, DescribeClientQuotasRequest, 
DescribeClientQuotasResponse}
 import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.server.IntegrationTestUtils
 import org.apache.kafka.server.config.QuotaConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Disabled
@@ -556,9 +557,9 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
 
   private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter): 
DescribeClientQuotasResponse = {
     val request = new DescribeClientQuotasRequest.Builder(filter).build()
-    
IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](request,
-      destination = cluster.anyBrokerSocketServer(),
-      listenerName = cluster.clientListener())
+    IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](
+      request,
+      cluster.boundPorts().get(0))
   }
 
   private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String, 
Option[Double]], validateOnly: Boolean) =
@@ -584,9 +585,8 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
 
   private def sendAlterClientQuotasRequest(entries: 
Iterable[ClientQuotaAlteration], validateOnly: Boolean): 
AlterClientQuotasResponse = {
     val request = new 
AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
-    IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](request,
-      destination = cluster.anyBrokerSocketServer(),
-      listenerName = cluster.clientListener())
+    IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](
+      request,
+      cluster.boundPorts().get(0))
   }
-
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index 1e6d27320ca..ad1782adc65 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -19,12 +19,12 @@ package kafka.server
 import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults, 
Type}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
DescribeQuorumRequest, DescribeQuorumResponse}
+import org.apache.kafka.common.requests.{DescribeQuorumRequest, 
DescribeQuorumResponse}
 import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.server.IntegrationTestUtils
 import org.junit.jupiter.api.Assertions._
 
 import scala.jdk.CollectionConverters._
-import scala.reflect.ClassTag
 
 @ClusterTestDefaults(types = Array(Type.KRAFT))
 class DescribeQuorumRequestTest(cluster: ClusterInstance) {
@@ -35,7 +35,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
       val request = new DescribeQuorumRequest.Builder(
         singletonRequest(KafkaRaftServer.MetadataPartition)
       ).build(version.toShort)
-      val response = connectAndReceive[DescribeQuorumResponse](request)
+      val response = 
IntegrationTestUtils.connectAndReceive[DescribeQuorumResponse](request, 
cluster.boundPorts().get(0))
 
       assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
       assertEquals("", response.data.errorMessage)
@@ -85,17 +85,4 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
       }
     }
   }
-
-  private def connectAndReceive[T <: AbstractResponse](
-    request: AbstractRequest
-  )(
-    implicit classTag: ClassTag[T]
-  ): T = {
-    IntegrationTestUtils.connectAndReceive(
-      request,
-      cluster.brokerSocketServers().asScala.head,
-      cluster.clientListener()
-    )
-  }
-
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index d66d34fddf9..da0215b6aaa 100644
--- 
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.common.utils.ProducerIdAndEpoch
 import 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
+import org.apache.kafka.server.IntegrationTestUtils
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
 
 import java.net.Socket
@@ -39,7 +40,7 @@ import java.util.stream.Collectors
 import scala.collection.Seq
 import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
-import scala.reflect.ClassTag
+
 
 class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
   private def brokers(): Seq[KafkaBroker] = 
cluster.brokers.values().stream().collect(Collectors.toList[KafkaBroker]).asScala.toSeq
@@ -900,42 +901,30 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
   }
 
   protected def connectAny(): Socket = {
-    val socket: Socket = IntegrationTestUtils.connect(
-      cluster.anyBrokerSocketServer(),
-      cluster.clientListener()
-    )
+    val socket: Socket = 
IntegrationTestUtils.connect(cluster.boundPorts().get(0))
     openSockets += socket
     socket
   }
 
   protected def connect(destination: Int): Socket = {
-    val socket: Socket = IntegrationTestUtils.connect(
-      brokerSocketServer(destination),
-      cluster.clientListener()
-    )
+    val socket = 
IntegrationTestUtils.connect(brokerSocketServer(destination).boundPort(cluster.clientListener()))
     openSockets += socket
     socket
   }
 
   protected def connectAndReceive[T <: AbstractResponse](
     request: AbstractRequest
-  )(implicit classTag: ClassTag[T]): T = {
-    IntegrationTestUtils.connectAndReceive[T](
-      request,
-      cluster.anyBrokerSocketServer(),
-      cluster.clientListener()
-    )
+  ): T = {
+    IntegrationTestUtils.connectAndReceive[T](request, 
cluster.boundPorts().get(0))
   }
 
   protected def connectAndReceive[T <: AbstractResponse](
     request: AbstractRequest,
     destination: Int
-  )(implicit classTag: ClassTag[T]): T = {
-    IntegrationTestUtils.connectAndReceive[T](
-      request,
-      brokerSocketServer(destination),
-      cluster.clientListener()
-    )
+  ): T = {
+    val socketServer = brokerSocketServer(destination)
+    val listenerName = cluster.clientListener()
+    IntegrationTestUtils.connectAndReceive[T](request, 
socketServer.boundPort(listenerName))
   }
 
   private def brokerSocketServer(brokerId: Int): SocketServer = {
diff --git 
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 711598306f4..4f63210f595 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -22,11 +22,11 @@ import 
org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.test.api.{ClusterTest, Type}
 import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.server.IntegrationTestUtils
 import org.junit.jupiter.api.Assertions._
 
 import java.net.Socket
 import java.util.Collections
-import scala.jdk.CollectionConverters._
 
 class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
 
@@ -35,7 +35,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVe
     controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
   )
   def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
-    val socket = 
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, 
cluster.clientListener())
+    val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
     try {
       val apiVersionsResponse = 
IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
         new ApiVersionsRequest.Builder().build(0), socket)
@@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVe
     controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
   )
   def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
-    val socket = 
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, 
cluster.clientListener())
+    val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
     try {
       sendSaslHandshakeRequestValidateResponse(socket)
       val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
@@ -72,7 +72,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVe
     controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
   )
   def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
-    val socket = 
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, 
cluster.clientListener())
+    val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
     try {
       val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0)
       val apiVersionsResponse = 
sendUnsupportedApiVersionRequest(apiVersionsRequest)
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 39e960a7349..44e57c5518a 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.{TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.common.requests.{FindCoordinatorRequest, 
FindCoordinatorResponse, ShareAcknowledgeRequest, ShareAcknowledgeResponse, 
ShareFetchRequest, ShareFetchResponse, ShareGroupHeartbeatRequest, 
ShareGroupHeartbeatResponse, ShareRequestMetadata}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.server.common.Feature
+import org.apache.kafka.server.IntegrationTestUtils
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, Timeout}
 
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index 865870eef3b..390fb11be36 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -25,9 +25,11 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ShareGroupHeartbeatRequest, 
ShareGroupHeartbeatResponse}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.server.common.Feature
+import org.apache.kafka.server.IntegrationTestUtils;
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, 
assertNotNull, assertNull, assertTrue}
 import org.junit.jupiter.api.Timeout
 
+
 import java.util
 import scala.jdk.CollectionConverters._
 
@@ -931,11 +933,7 @@ class ShareGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   }
 
   private def connectAndReceive(request: ShareGroupHeartbeatRequest): 
ShareGroupHeartbeatResponse = {
-    IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](
-      request,
-      cluster.anyBrokerSocketServer(),
-      cluster.clientListener()
-    )
+    
IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request, 
cluster.boundPorts().get(0))
   }
 
   private def increasePartitions[B <: KafkaBroker](admin: Admin,
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/IntegrationTestUtils.java 
b/server-common/src/test/java/org/apache/kafka/server/IntegrationTestUtils.java
new file mode 100644
index 00000000000..38e0ade67b3
--- /dev/null
+++ 
b/server-common/src/test/java/org/apache/kafka/server/IntegrationTestUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class IntegrationTestUtils {
+
+    private static final AtomicInteger CORRELATION_ID = new AtomicInteger(0);
+
+    public static void sendRequest(Socket socket, byte[] request) throws 
IOException {
+        DataOutputStream outgoing = new 
DataOutputStream(socket.getOutputStream());
+        outgoing.writeInt(request.length);
+        outgoing.write(request);
+        outgoing.flush();
+    }
+
+    public static RequestHeader nextRequestHeader(ApiKeys apiKey, short 
apiVersion) {
+        return new RequestHeader(apiKey, apiVersion, "client-id", 
CORRELATION_ID.getAndIncrement());
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T extends AbstractResponse> T receive(Socket socket, 
ApiKeys apiKey, short version) throws IOException, ClassCastException {
+        var incoming = new DataInputStream(socket.getInputStream());
+        int len = incoming.readInt();
+
+        var responseBytes = new byte[len];
+        incoming.readFully(responseBytes);
+
+        var responseBuffer = ByteBuffer.wrap(responseBytes);
+        ResponseHeader.parse(responseBuffer, 
apiKey.responseHeaderVersion(version));
+
+        return (T) AbstractResponse.parseResponse(apiKey, new 
ByteBufferAccessor(responseBuffer), version);
+    }
+
+    public static <T extends AbstractResponse> T sendAndReceive(
+        AbstractRequest request,
+        Socket socket
+    ) throws IOException {
+        var header = nextRequestHeader(request.apiKey(), request.version());
+        sendRequest(socket, 
Utils.toArray(request.serializeWithHeader(header)));
+        return receive(socket, request.apiKey(), request.version());
+    }
+
+    public static <T extends AbstractResponse> T connectAndReceive(
+        AbstractRequest request,
+        int port
+    ) throws IOException {
+        try (Socket socket = connect(port)) {
+            return sendAndReceive(request, socket);
+        }
+    }
+
+    public static Socket connect(int port) throws IOException {
+        return new Socket("localhost", port);
+    }
+}
\ No newline at end of file
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 0b6f6b23b97..676adbf21e8 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -406,4 +406,11 @@ public interface ClusterInstance {
                     .orElseThrow(() -> new RuntimeException("Leader not found 
for tp " + topicPartition));
         }
     }
+
+    default List<Integer> boundPorts() {
+        return brokers().values().stream()
+                .map(KafkaBroker::socketServer)
+                .map(s -> s.boundPort(clientListener()))
+                .toList();
+    }
 }

Reply via email to