This is an automated email from the ASF dual-hosted git repository.

edoardocomar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c93ec9a5fa MINOR: tidy up SocketServerMemoryPoolTest (#21873)
5c93ec9a5fa is described below

commit 5c93ec9a5fac0902abe14af1c359a2f0b1c2f338
Author: Edoardo Comar <[email protected]>
AuthorDate: Thu May 28 09:44:34 2026 +0100

    MINOR: tidy up SocketServerMemoryPoolTest (#21873)
    
    * moved to package org.apache.kafka.common
    * use IntegrationTestUtils and RequestUtils to create messages
    * made SocketServer MemoryPool accessible to Java unit test
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../SocketServerMemoryPoolTest.java                | 92 +++++-----------------
 .../main/scala/kafka/network/SocketServer.scala    |  2 +-
 2 files changed, 21 insertions(+), 73 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/SocketServerMemoryPoolTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java
similarity index 54%
rename from 
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/SocketServerMemoryPoolTest.java
rename to 
clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java
index 2278718a1cc..e1b847b8896 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/SocketServerMemoryPoolTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java
@@ -14,28 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients.producer;
+package org.apache.kafka.common;
 
-import kafka.network.SocketServer;
-import kafka.server.KafkaBroker;
-
-import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.ProduceRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestUtils;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.TestKitDefaults;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.server.IntegrationTestUtils;
 
 import java.io.EOFException;
 import java.io.InputStream;
-import java.lang.reflect.Field;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -50,35 +48,25 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 })
 public class SocketServerMemoryPoolTest {
     @ClusterTest
-    public void testProduceRequestWithUnsupportedVersion(ClusterInstance 
clusterInstance) throws Exception {
-        short unsupportedVersion = Short.MAX_VALUE;
-        byte[] rawRequestBytes = buildRawRequest(
-                ApiKeys.PRODUCE.id,
-                unsupportedVersion,
-                /* correlationId */ 1,
-                /* clientId     */ "test-unsupported-version",
-                new byte[10000]
-        );
+    public void testRequestWithUnsupportedVersion(ClusterInstance 
clusterInstance) throws Exception {
+        RequestHeader header = 
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, Short.MAX_VALUE);
+        ByteBuffer buffer = RequestUtils.serialize(header.data(), 
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+        byte[] rawRequestBytes = buffer.array();
 
         sendAndAssert(clusterInstance, rawRequestBytes);
     }
 
     @ClusterTest
-    public void testProduceRequestWithCorruptBody(ClusterInstance 
clusterInstance) throws Exception {
-        short validVersion = 3;
-        byte[] corruptBody = new byte[10000];
-        for (int i = 0; i < corruptBody.length; i++) {
-            corruptBody[i] = (byte) 0xFF; // The corrupt body (0xFF ... 0xFF) 
makes Schema.read() throw SchemaException.
+    public void testRequestWithCorruptBody(ClusterInstance clusterInstance) 
throws Exception {
+        RequestHeader header = 
IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, 
ApiKeys.PRODUCE.latestVersion());
+        ByteBuffer buffer = RequestUtils.serialize(header.data(), 
header.headerVersion(), new ProduceRequestData(), header.apiVersion());
+        byte[] rawRequestBytes = buffer.array();
+
+        // corrupt body but leave header valid
+        assertTrue(rawRequestBytes.length > header.size(), "must have body 
bytes to corrupt");
+        for (int i = header.size(); i < rawRequestBytes.length; i++) {
+            rawRequestBytes[i] = (byte) 0xFF;
         }
-
-        byte[] rawRequestBytes = buildRawRequest(
-                ApiKeys.PRODUCE.id,
-                validVersion,
-                /* correlationId */ 2,
-                /* clientId     */ "test-corrupt-body",
-                corruptBody
-        );
-
         sendAndAssert(clusterInstance, rawRequestBytes);
     }
 
@@ -96,48 +84,8 @@ public class SocketServerMemoryPoolTest {
         assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
     }
 
-    // This test uses reflection to read the SocketServer memoryPool 
availableMemory.
-    // The metric "MemoryPoolAvailable" from Yammer Metrics default registry
-    // can be overwritten in a @ClusterTest as the registry is a singleton.
-    long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws 
Exception {
-        KafkaBroker broker = 
clusterInstance.aliveBrokers().values().iterator().next();
-        SocketServer socketServer = broker.socketServer();
-        Field memoryPoolField = 
socketServer.getClass().getDeclaredField("memoryPool");
-        memoryPoolField.setAccessible(true);
-        MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer);
-        return memoryPool.availableMemory();
-    }
-
-    /**
-     * Builds a raw Kafka request excluding the frame length
-     *
-     * <p>Wire layout:
-     * <pre>
-     *   4 bytes – frame length (payload size, not including these 4 bytes)
-     *
-     *   2 bytes – api_key
-     *   2 bytes – api_version
-     *   4 bytes – correlation_id
-     *   2 bytes – client_id string length
-     *   N bytes – client_id (UTF-8)
-     *   X bytes - request body
-     * </pre>
-     */
-    private static byte[] buildRawRequest(short apiKey, short apiVersion, int 
correlationId, String clientId, byte[] body) {
-        byte[] clientIdBytes = clientId.getBytes(StandardCharsets.UTF_8);
-
-        // Header: api_key(2) + api_version(2) + correlation_id(4) + 
client_id_len(2) + client_id
-        int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length;
-        int payloadSize = headerSize + body.length;
-
-        ByteBuffer buf = ByteBuffer.allocate(payloadSize);
-        buf.putShort(apiKey);                          // api_key
-        buf.putShort(apiVersion);                      // api_version
-        buf.putInt(correlationId);                     // correlation_id
-        buf.putShort((short) clientIdBytes.length);    // client_id string 
length
-        buf.put(clientIdBytes);                        // client_id bytes
-        buf.put(body);                                 // request body 
(possibly empty / corrupt)
-        return buf.array();
+    private long getMemoryPoolAvailable(ClusterInstance clusterInstance) {
+        return 
clusterInstance.brokers().get(TestKitDefaults.BROKER_ID_OFFSET).socketServer().memoryPool().availableMemory();
     }
 
     /*
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index f1658b5d647..973d586b010 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -98,7 +98,7 @@ class SocketServer(
   private val memoryPoolDepletedPercentMetricName = 
metrics.metricName("MemoryPoolAvgDepletedPercent", JSocketServer.METRICS_GROUP)
   private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", JSocketServer.METRICS_GROUP)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
-  private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
+  private[network] val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
   private[network] val dataPlaneAcceptors = new ConcurrentHashMap[Endpoint, 
DataPlaneAcceptor]()
   val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time, 
apiVersionManager.newRequestMetrics)

Reply via email to