This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 46b474a9dee KAFKA-19239 Rewrite IntegrationTestUtils by java (#19776)
46b474a9dee is described below
commit 46b474a9dee4fc0c56ededf4adf44704372de79b
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Fri Jun 20 01:46:29 2025 +0800
KAFKA-19239 Rewrite IntegrationTestUtils by java (#19776)
This PR rewrites the IntegrationTestUtils.java from Scala to Java.
## Changes:
- Converted all the existing Scala code in IntegrationTestUtils.scala
into Java in IntegrationTestUtils.java.
- Preserved the original logic and functionality to ensure backward
compatibility.
- Updated relevant imports and dependencies accordingly.
Motivation:
The rewrite aims to standardize the codebase in Java, which aligns
better with the rest of the project and facilitates easier maintenance
by the Java-centric team.
Reviewers: TaiJuWu <[email protected]>, Ken Huang <[email protected]>,
PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../transaction/ProducerIntegrationTest.scala | 9 +-
.../kafka/server/IntegrationTestUtils.scala | 106 ---------------------
.../kafka/server/KRaftClusterTest.scala | 12 +--
.../server/AbstractApiVersionsRequestTest.scala | 5 +-
.../server/AllocateProducerIdsRequestTest.scala | 7 +-
.../kafka/server/ClientQuotasRequestTest.scala | 14 +--
.../kafka/server/DescribeQuorumRequestTest.scala | 19 +---
.../server/GroupCoordinatorBaseRequestTest.scala | 31 ++----
.../kafka/server/SaslApiVersionsRequestTest.scala | 8 +-
.../server/ShareFetchAcknowledgeRequestTest.scala | 1 +
.../server/ShareGroupHeartbeatRequestTest.scala | 8 +-
.../apache/kafka/server/IntegrationTestUtils.java | 84 ++++++++++++++++
.../apache/kafka/common/test/ClusterInstance.java | 7 ++
13 files changed, 133 insertions(+), 178 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
index d2be817e26d..75b1bba0e12 100644
---
a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala
@@ -18,7 +18,7 @@
package kafka.coordinator.transaction
import kafka.network.SocketServer
-import kafka.server.IntegrationTestUtils
+import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic, TransactionState}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
ConsumerRecords, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{Producer, ProducerConfig,
ProducerRecord}
@@ -206,11 +206,8 @@ class ProducerIntegrationTest {
.setTransactionalId(null)
.setTransactionTimeoutMs(10)
val request = new InitProducerIdRequest.Builder(data).build()
-
- response =
IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request,
- destination = broker,
- listenerName = listener)
-
+ val port = broker.boundPort(listener)
+ response =
IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, port)
shouldRetry = response.data.errorCode ==
Errors.COORDINATOR_LOAD_IN_PROGRESS.code
}
assertTrue(deadline.hasTimeLeft())
diff --git
a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
deleted file mode 100644
index a6ac36ab2ae..00000000000
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.network.SocketServer
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiKeys, ByteBufferAccessor}
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
RequestHeader, ResponseHeader}
-import org.apache.kafka.common.utils.Utils
-
-import java.io.{DataInputStream, DataOutputStream}
-import java.net.Socket
-import java.nio.ByteBuffer
-import scala.reflect.ClassTag
-
-object IntegrationTestUtils {
-
- def sendRequest(socket: Socket, request: Array[Byte]): Unit = {
- val outgoing = new DataOutputStream(socket.getOutputStream)
- outgoing.writeInt(request.length)
- outgoing.write(request)
- outgoing.flush()
- }
-
- private def sendWithHeader(request: AbstractRequest, header: RequestHeader,
socket: Socket): Unit = {
- val serializedBytes = Utils.toArray(request.serializeWithHeader(header))
- sendRequest(socket, serializedBytes)
- }
-
- def nextRequestHeader[T <: AbstractResponse](apiKey: ApiKeys,
- apiVersion: Short,
- clientId: String = "client-id",
- correlationIdOpt: Option[Int] =
None): RequestHeader = {
- val correlationId = correlationIdOpt.getOrElse {
- this.correlationId += 1
- this.correlationId
- }
- new RequestHeader(apiKey, apiVersion, clientId, correlationId)
- }
-
- def send(request: AbstractRequest,
- socket: Socket,
- clientId: String = "client-id",
- correlationId: Option[Int] = None): Unit = {
- val header = nextRequestHeader(request.apiKey, request.version, clientId,
correlationId)
- sendWithHeader(request, header, socket)
- }
-
- def receive[T <: AbstractResponse](socket: Socket, apiKey: ApiKeys, version:
Short)
- (implicit classTag: ClassTag[T]): T = {
- val incoming = new DataInputStream(socket.getInputStream)
- val len = incoming.readInt()
-
- val responseBytes = new Array[Byte](len)
- incoming.readFully(responseBytes)
-
- val responseBuffer = ByteBuffer.wrap(responseBytes)
- ResponseHeader.parse(responseBuffer, apiKey.responseHeaderVersion(version))
-
- AbstractResponse.parseResponse(apiKey, new
ByteBufferAccessor(responseBuffer), version) match {
- case response: T => response
- case response =>
- throw new ClassCastException(s"Expected response with type
${classTag.runtimeClass}, but found ${response.getClass}")
- }
- }
-
- def sendAndReceive[T <: AbstractResponse](request: AbstractRequest,
- socket: Socket,
- clientId: String = "client-id",
- correlationId: Option[Int] = None)
- (implicit classTag: ClassTag[T]): T
= {
- send(request, socket, clientId, correlationId)
- receive[T](socket, request.apiKey, request.version)
- }
-
- def connectAndReceive[T <: AbstractResponse](request: AbstractRequest,
- destination: SocketServer,
- listenerName: ListenerName)
- (implicit classTag:
ClassTag[T]): T = {
- val socket = connect(destination, listenerName)
- try sendAndReceive[T](request, socket)
- finally socket.close()
- }
-
- private var correlationId = 0
-
- def connect(socketServer: SocketServer,
- listenerName: ListenerName): Socket = {
- new Socket("localhost", socketServer.boundPort(listenerName))
- }
-}
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index ab44b0c7a0a..55911c1e796 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -18,8 +18,8 @@
package kafka.server
import kafka.network.SocketServer
-import kafka.server.IntegrationTestUtils.connectAndReceive
import kafka.utils.TestUtils
+import org.apache.kafka.server.IntegrationTestUtils.connectAndReceive
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
@@ -518,12 +518,10 @@ class KRaftClusterTest {
}
private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
- listenerName:
ListenerName): DescribeClusterResponse =
- connectAndReceive[DescribeClusterResponse](
- request = new DescribeClusterRequest.Builder(new
DescribeClusterRequestData()).build(),
- destination = destination,
- listenerName = listenerName
- )
+ listenerName:
ListenerName): DescribeClusterResponse = {
+ connectAndReceive[DescribeClusterResponse](new
DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
+ destination.boundPort(listenerName))
+ }
@Test
def testCreateClusterAndPerformReassignment(): Unit = {
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 704d810137f..bd587cda2a0 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest,
ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion,
GroupVersion, MetadataVersion, ShareVersion, StreamsVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
@@ -41,12 +42,12 @@ abstract class AbstractApiVersionsRequestTest(cluster:
ClusterInstance) {
} else {
cluster.brokerSocketServers().asScala.head
}
- IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request,
socket, listenerName)
+ IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request,
socket.boundPort(listenerName))
}
def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest):
ApiVersionsResponse = {
val overrideHeader =
IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
- val socket =
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head,
cluster.clientListener())
+ val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
try {
val serializedBytes = Utils.toArray(
RequestUtils.serialize(overrideHeader.data,
overrideHeader.headerVersion, request.data, request.version))
diff --git
a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
index cb44719fad7..aa399985f83 100644
--- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala
@@ -17,12 +17,13 @@
package unit.kafka.server
import kafka.network.SocketServer
-import kafka.server.{BrokerServer, ControllerServer, IntegrationTestUtils}
+import kafka.server.{BrokerServer, ControllerServer}
import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults,
Type}
import org.apache.kafka.common.message.AllocateProducerIdsRequestData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests._
import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.server.common.ProducerIdsBlock
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
@@ -81,9 +82,7 @@ class AllocateProducerIdsRequestTest(cluster:
ClusterInstance) {
): AllocateProducerIdsResponse = {
IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
request,
- controllerSocketServer,
- cluster.controllerListenerName
+ controllerSocketServer.boundPort(cluster.controllerListenerName())
)
}
-
}
diff --git
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index 8c30f749427..2e962f09dc8 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest,
AlterClientQuotasResponse, DescribeClientQuotasRequest,
DescribeClientQuotasResponse}
import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.server.IntegrationTestUtils
import org.apache.kafka.server.config.QuotaConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Disabled
@@ -556,9 +557,9 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter):
DescribeClientQuotasResponse = {
val request = new DescribeClientQuotasRequest.Builder(filter).build()
-
IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](request,
- destination = cluster.anyBrokerSocketServer(),
- listenerName = cluster.clientListener())
+ IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](
+ request,
+ cluster.boundPorts().get(0))
}
private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String,
Option[Double]], validateOnly: Boolean) =
@@ -584,9 +585,8 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
private def sendAlterClientQuotasRequest(entries:
Iterable[ClientQuotaAlteration], validateOnly: Boolean):
AlterClientQuotasResponse = {
val request = new
AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
- IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](request,
- destination = cluster.anyBrokerSocketServer(),
- listenerName = cluster.clientListener())
+ IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](
+ request,
+ cluster.boundPorts().get(0))
}
-
}
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index 1e6d27320ca..ad1782adc65 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -19,12 +19,12 @@ package kafka.server
import org.apache.kafka.common.test.api.{ClusterTest, ClusterTestDefaults,
Type}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.DescribeQuorumRequest.singletonRequest
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
DescribeQuorumRequest, DescribeQuorumResponse}
+import org.apache.kafka.common.requests.{DescribeQuorumRequest,
DescribeQuorumResponse}
import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
-import scala.reflect.ClassTag
@ClusterTestDefaults(types = Array(Type.KRAFT))
class DescribeQuorumRequestTest(cluster: ClusterInstance) {
@@ -35,7 +35,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
val request = new DescribeQuorumRequest.Builder(
singletonRequest(KafkaRaftServer.MetadataPartition)
).build(version.toShort)
- val response = connectAndReceive[DescribeQuorumResponse](request)
+ val response =
IntegrationTestUtils.connectAndReceive[DescribeQuorumResponse](request,
cluster.boundPorts().get(0))
assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
assertEquals("", response.data.errorMessage)
@@ -85,17 +85,4 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
}
}
}
-
- private def connectAndReceive[T <: AbstractResponse](
- request: AbstractRequest
- )(
- implicit classTag: ClassTag[T]
- ): T = {
- IntegrationTestUtils.connectAndReceive(
- request,
- cluster.brokerSocketServers().asScala.head,
- cluster.clientListener()
- )
- }
-
}
diff --git
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
index d66d34fddf9..da0215b6aaa 100644
---
a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
+import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import java.net.Socket
@@ -39,7 +40,7 @@ import java.util.stream.Collectors
import scala.collection.Seq
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
-import scala.reflect.ClassTag
+
class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
private def brokers(): Seq[KafkaBroker] =
cluster.brokers.values().stream().collect(Collectors.toList[KafkaBroker]).asScala.toSeq
@@ -900,42 +901,30 @@ class GroupCoordinatorBaseRequestTest(cluster:
ClusterInstance) {
}
protected def connectAny(): Socket = {
- val socket: Socket = IntegrationTestUtils.connect(
- cluster.anyBrokerSocketServer(),
- cluster.clientListener()
- )
+ val socket: Socket =
IntegrationTestUtils.connect(cluster.boundPorts().get(0))
openSockets += socket
socket
}
protected def connect(destination: Int): Socket = {
- val socket: Socket = IntegrationTestUtils.connect(
- brokerSocketServer(destination),
- cluster.clientListener()
- )
+ val socket =
IntegrationTestUtils.connect(brokerSocketServer(destination).boundPort(cluster.clientListener()))
openSockets += socket
socket
}
protected def connectAndReceive[T <: AbstractResponse](
request: AbstractRequest
- )(implicit classTag: ClassTag[T]): T = {
- IntegrationTestUtils.connectAndReceive[T](
- request,
- cluster.anyBrokerSocketServer(),
- cluster.clientListener()
- )
+ ): T = {
+ IntegrationTestUtils.connectAndReceive[T](request,
cluster.boundPorts().get(0))
}
protected def connectAndReceive[T <: AbstractResponse](
request: AbstractRequest,
destination: Int
- )(implicit classTag: ClassTag[T]): T = {
- IntegrationTestUtils.connectAndReceive[T](
- request,
- brokerSocketServer(destination),
- cluster.clientListener()
- )
+ ): T = {
+ val socketServer = brokerSocketServer(destination)
+ val listenerName = cluster.clientListener()
+ IntegrationTestUtils.connectAndReceive[T](request,
socketServer.boundPort(listenerName))
}
private def brokerSocketServer(brokerId: Int): SocketServer = {
diff --git
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 711598306f4..4f63210f595 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -22,11 +22,11 @@ import
org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.test.api.{ClusterTest, Type}
import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions._
import java.net.Socket
import java.util.Collections
-import scala.jdk.CollectionConverters._
class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends
AbstractApiVersionsRequestTest(cluster) {
@@ -35,7 +35,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestBeforeSaslHandshakeRequest(): Unit = {
- val socket =
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head,
cluster.clientListener())
+ val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
try {
val apiVersionsResponse =
IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
new ApiVersionsRequest.Builder().build(0), socket)
@@ -56,7 +56,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestAfterSaslHandshakeRequest(): Unit = {
- val socket =
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head,
cluster.clientListener())
+ val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
try {
sendSaslHandshakeRequestValidateResponse(socket)
val response = IntegrationTestUtils.sendAndReceive[ApiVersionsResponse](
@@ -72,7 +72,7 @@ class SaslApiVersionsRequestTest(cluster: ClusterInstance)
extends AbstractApiVe
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT
)
def testApiVersionsRequestWithUnsupportedVersion(): Unit = {
- val socket =
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head,
cluster.clientListener())
+ val socket = IntegrationTestUtils.connect(cluster.boundPorts().get(0))
try {
val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0)
val apiVersionsResponse =
sendUnsupportedApiVersionRequest(apiVersionsRequest)
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index 39e960a7349..44e57c5518a 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.common.{TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.common.requests.{FindCoordinatorRequest,
FindCoordinatorResponse, ShareAcknowledgeRequest, ShareAcknowledgeResponse,
ShareFetchRequest, ShareFetchResponse, ShareGroupHeartbeatRequest,
ShareGroupHeartbeatResponse, ShareRequestMetadata}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.common.Feature
+import org.apache.kafka.server.IntegrationTestUtils
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, Timeout}
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index 865870eef3b..390fb11be36 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -25,9 +25,11 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ShareGroupHeartbeatRequest,
ShareGroupHeartbeatResponse}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.common.Feature
+import org.apache.kafka.server.IntegrationTestUtils;
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals,
assertNotNull, assertNull, assertTrue}
import org.junit.jupiter.api.Timeout
+
import java.util
import scala.jdk.CollectionConverters._
@@ -931,11 +933,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
}
private def connectAndReceive(request: ShareGroupHeartbeatRequest):
ShareGroupHeartbeatResponse = {
- IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](
- request,
- cluster.anyBrokerSocketServer(),
- cluster.clientListener()
- )
+
IntegrationTestUtils.connectAndReceive[ShareGroupHeartbeatResponse](request,
cluster.boundPorts().get(0))
}
private def increasePartitions[B <: KafkaBroker](admin: Admin,
diff --git
a/server-common/src/test/java/org/apache/kafka/server/IntegrationTestUtils.java
b/server-common/src/test/java/org/apache/kafka/server/IntegrationTestUtils.java
new file mode 100644
index 00000000000..38e0ade67b3
--- /dev/null
+++
b/server-common/src/test/java/org/apache/kafka/server/IntegrationTestUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class IntegrationTestUtils {
+
+ private static final AtomicInteger CORRELATION_ID = new AtomicInteger(0);
+
+ public static void sendRequest(Socket socket, byte[] request) throws
IOException {
+ DataOutputStream outgoing = new
DataOutputStream(socket.getOutputStream());
+ outgoing.writeInt(request.length);
+ outgoing.write(request);
+ outgoing.flush();
+ }
+
+ public static RequestHeader nextRequestHeader(ApiKeys apiKey, short
apiVersion) {
+ return new RequestHeader(apiKey, apiVersion, "client-id",
CORRELATION_ID.getAndIncrement());
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T extends AbstractResponse> T receive(Socket socket,
ApiKeys apiKey, short version) throws IOException, ClassCastException {
+ var incoming = new DataInputStream(socket.getInputStream());
+ int len = incoming.readInt();
+
+ var responseBytes = new byte[len];
+ incoming.readFully(responseBytes);
+
+ var responseBuffer = ByteBuffer.wrap(responseBytes);
+ ResponseHeader.parse(responseBuffer,
apiKey.responseHeaderVersion(version));
+
+ return (T) AbstractResponse.parseResponse(apiKey, new
ByteBufferAccessor(responseBuffer), version);
+ }
+
+ public static <T extends AbstractResponse> T sendAndReceive(
+ AbstractRequest request,
+ Socket socket
+ ) throws IOException {
+ var header = nextRequestHeader(request.apiKey(), request.version());
+ sendRequest(socket,
Utils.toArray(request.serializeWithHeader(header)));
+ return receive(socket, request.apiKey(), request.version());
+ }
+
+ public static <T extends AbstractResponse> T connectAndReceive(
+ AbstractRequest request,
+ int port
+ ) throws IOException {
+ try (Socket socket = connect(port)) {
+ return sendAndReceive(request, socket);
+ }
+ }
+
+ public static Socket connect(int port) throws IOException {
+ return new Socket("localhost", port);
+ }
+}
\ No newline at end of file
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 0b6f6b23b97..676adbf21e8 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -406,4 +406,11 @@ public interface ClusterInstance {
.orElseThrow(() -> new RuntimeException("Leader not found
for tp " + topicPartition));
}
}
+
+ default List<Integer> boundPorts() {
+ return brokers().values().stream()
+ .map(KafkaBroker::socketServer)
+ .map(s -> s.boundPort(clientListener()))
+ .toList();
+ }
}