This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f0a3960e3e1 KAFKA-17867 Consider using zero-copy for 
PushTelemetryRequest (#17622)
f0a3960e3e1 is described below

commit f0a3960e3e114228bd4d8b220812fc142f832b2b
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Oct 30 20:49:56 2024 +0800

    KAFKA-17867 Consider using zero-copy for PushTelemetryRequest (#17622)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../common/requests/PushTelemetryRequest.java      |  2 +-
 .../internals/ClientTelemetryReporter.java         |  5 +++--
 .../telemetry/internals/ClientTelemetryUtils.java  | 12 +++++------
 .../common/message/PushTelemetryRequest.json       |  2 +-
 .../common/requests/PushTelemetryRequestTest.java  |  7 +++---
 .../kafka/common/requests/RequestResponseTest.java |  2 +-
 .../internals/ClientTelemetryUtilsTest.java        |  6 +++---
 .../apache/kafka/server/ClientMetricsManager.java  |  7 +++---
 .../kafka/server/ClientMetricsManagerTest.java     | 25 +++++++++++-----------
 .../metrics/ClientMetricsReceiverPluginTest.java   |  3 ++-
 10 files changed, 37 insertions(+), 34 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
index 94e029d4db1..9264de0f59e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java
@@ -88,7 +88,7 @@ public class PushTelemetryRequest extends AbstractRequest {
     public ByteBuffer metricsData() {
         CompressionType cType = 
CompressionType.forId(this.data.compressionType());
         return (cType == CompressionType.NONE) ?
-            ByteBuffer.wrap(this.data.metrics()) : 
ClientTelemetryUtils.decompress(this.data.metrics(), cType);
+            this.data.metrics() : 
ClientTelemetryUtils.decompress(this.data.metrics(), cType);
     }
 
     public static PushTelemetryRequest parse(ByteBuffer buffer, short version) 
{
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 7dfaa2b0e50..91df6b8aac5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -710,12 +711,12 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
             }
 
             CompressionType compressionType = 
ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes());
-            byte[] compressedPayload;
+            ByteBuffer compressedPayload;
             try {
                 compressedPayload = ClientTelemetryUtils.compress(payload, 
compressionType);
             } catch (IOException e) {
                 log.info("Failed to compress telemetry payload for 
compression: {}, sending uncompressed data", compressionType);
-                compressedPayload = payload.toByteArray();
+                compressedPayload = ByteBuffer.wrap(payload.toByteArray());
                 compressionType = CompressionType.NONE;
             }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index a943f85b38f..3c555afb3b0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.apache.kafka.common.utils.Utils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -191,23 +190,22 @@ public class ClientTelemetryUtils {
         return CompressionType.NONE;
     }
 
-    public static byte[] compress(MetricsData metrics, CompressionType 
compressionType) throws IOException {
+    public static ByteBuffer compress(MetricsData metrics, CompressionType 
compressionType) throws IOException {
         try (ByteBufferOutputStream compressedOut = new 
ByteBufferOutputStream(512)) {
             Compression compression = Compression.of(compressionType).build();
             try (OutputStream out = compression.wrapForOutput(compressedOut, 
RecordBatch.CURRENT_MAGIC_VALUE)) {
                 metrics.writeTo(out);
             }
             compressedOut.buffer().flip();
-            return Utils.toArray(compressedOut.buffer());
+            return compressedOut.buffer();
         }
     }
 
-    public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
-        ByteBuffer data = ByteBuffer.wrap(metrics);
+    public static ByteBuffer decompress(ByteBuffer metrics, CompressionType 
compressionType) {
         Compression compression = Compression.of(compressionType).build();
-        try (InputStream in = compression.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+        try (InputStream in = compression.wrapForInput(metrics, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
             ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {
-            byte[] bytes = new byte[data.capacity() * 2];
+            byte[] bytes = new byte[metrics.limit() * 2];
             int nRead;
             while ((nRead = in.read(bytes, 0, bytes.length)) != -1) {
                 out.write(bytes, 0, nRead);
diff --git 
a/clients/src/main/resources/common/message/PushTelemetryRequest.json 
b/clients/src/main/resources/common/message/PushTelemetryRequest.json
index b91cc7d94f7..dd39bbf1ce6 100644
--- a/clients/src/main/resources/common/message/PushTelemetryRequest.json
+++ b/clients/src/main/resources/common/message/PushTelemetryRequest.json
@@ -38,7 +38,7 @@
       "about": "Compression codec used to compress the metrics."
     },
     {
-      "name": "Metrics", "type": "bytes", "versions": "0+",
+      "name": "Metrics", "type": "bytes", "versions": "0+", "zeroCopy": true,
       "about": "Metrics encoded in OpenTelemetry MetricsData v1 protobuf 
format."
     }
   ]
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
index 6617a88d9d5..beed3e49102 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryRequestTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
 import org.apache.kafka.common.telemetry.internals.MetricKey;
 import org.apache.kafka.common.telemetry.internals.SinglePointMetric;
+import org.apache.kafka.common.utils.Utils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -70,12 +71,12 @@ public class PushTelemetryRequestTest {
     }
 
     private PushTelemetryRequest getPushTelemetryRequest(MetricsData 
metricsData, CompressionType compressionType) throws IOException {
-        byte[] compressedData = ClientTelemetryUtils.compress(metricsData, 
compressionType);
+        ByteBuffer compressedData = ClientTelemetryUtils.compress(metricsData, 
compressionType);
         byte[] data = metricsData.toByteArray();
         if (compressionType != CompressionType.NONE) {
-            assertTrue(compressedData.length < data.length);
+            assertTrue(compressedData.limit() < data.length);
         } else {
-            assertArrayEquals(compressedData, data);
+            assertArrayEquals(Utils.toArray(compressedData), data);
         }
 
         return new PushTelemetryRequest.Builder(
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a3206b60c9e..5de7f898d43 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -3872,7 +3872,7 @@ public class RequestResponseTest {
             .setSubscriptionId(1)
             .setTerminating(false)
             .setCompressionType(CompressionType.ZSTD.id)
-            .setMetrics("test-metrics".getBytes(StandardCharsets.UTF_8))
+            
.setMetrics(ByteBuffer.wrap("test-metrics".getBytes(StandardCharsets.UTF_8)))
         ).build(version);
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
index 9f21c6c6fda..41679bed3f7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java
@@ -134,12 +134,12 @@ public class ClientTelemetryUtilsTest {
     public void testCompressDecompress(CompressionType compressionType) throws 
IOException {
         MetricsData metricsData = getMetricsData();
         byte[] raw = metricsData.toByteArray();
-        byte[] compressed = ClientTelemetryUtils.compress(metricsData, 
compressionType);
+        ByteBuffer compressed = ClientTelemetryUtils.compress(metricsData, 
compressionType);
         assertNotNull(compressed);
         if (compressionType != CompressionType.NONE) {
-            assertTrue(compressed.length < raw.length);
+            assertTrue(compressed.limit() < raw.length);
         } else {
-            assertArrayEquals(raw, compressed);
+            assertArrayEquals(raw, Utils.toArray(compressed));
         }
         ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
         assertNotNull(decompressed);
diff --git 
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java 
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index d3e3c137ac2..ba26c9b4dae 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -59,6 +59,7 @@ import org.apache.kafka.server.util.timer.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -210,8 +211,8 @@ public class ClientMetricsManager implements AutoCloseable {
         }
 
         // Push the metrics to the external client receiver plugin.
-        byte[] metrics = request.data().metrics();
-        if (metrics != null && metrics.length > 0) {
+        ByteBuffer metrics = request.data().metrics();
+        if (metrics != null && metrics.limit() > 0) {
             try {
                 long exportTimeStartMs = time.hiResClockMs();
                 receiverPlugin.exportMetrics(requestContext, request);
@@ -428,7 +429,7 @@ public class ClientMetricsManager implements AutoCloseable {
             throw new UnsupportedCompressionTypeException(msg);
         }
 
-        if (request.data().metrics() != null && 
request.data().metrics().length > clientTelemetryMaxBytes) {
+        if (request.data().metrics() != null && 
request.data().metrics().limit() > clientTelemetryMaxBytes) {
             String msg = String.format("Telemetry request from [%s] is larger 
than the maximum allowed size [%s]",
                 request.data().clientInstanceId(), clientTelemetryMaxBytes);
             throw new TelemetryTooLargeException(msg);
diff --git 
a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index 7edb286ae1a..4cb4053505b 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -364,7 +365,7 @@ public class ClientMetricsManagerTest {
                             
.setClientInstanceId(response.data().clientInstanceId())
                             
.setSubscriptionId(response.data().subscriptionId())
                             .setCompressionType(CompressionType.NONE.id)
-                            
.setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
+                            
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
             PushTelemetryResponse pushResponse = 
newClientMetricsManager.processPushTelemetryRequest(
                     pushRequest, ClientMetricsTestUtils.requestContext());
@@ -559,7 +560,7 @@ public class ClientMetricsManagerTest {
                 
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                 
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
                 .setCompressionType(CompressionType.NONE.id)
-                .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), 
true).build();
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
         PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
             request, ClientMetricsTestUtils.requestContext());
@@ -603,7 +604,7 @@ public class ClientMetricsManagerTest {
                     new PushTelemetryRequestData()
                             
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                             
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
-                            
.setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
+                            
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
             PushTelemetryResponse response = 
newClientMetricsManager.processPushTelemetryRequest(
                     request, ClientMetricsTestUtils.requestContext());
@@ -640,7 +641,7 @@ public class ClientMetricsManagerTest {
                 
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                 
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
                 .setCompressionType(CompressionType.NONE.id)
-                .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), 
true).build();
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
         PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
             request, ClientMetricsTestUtils.requestContext());
@@ -688,7 +689,7 @@ public class ClientMetricsManagerTest {
             new PushTelemetryRequestData()
                 
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                 
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
-                .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), 
true).build();
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
         PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
             request, ClientMetricsTestUtils.requestContext());
@@ -726,7 +727,7 @@ public class ClientMetricsManagerTest {
             new PushTelemetryRequestData()
                 
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                 
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
-                .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), 
true).build();
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
         PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
             request, ClientMetricsTestUtils.requestContext());
@@ -738,7 +739,7 @@ public class ClientMetricsManagerTest {
             new PushTelemetryRequestData()
                 
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                 
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
-                .setMetrics("test-data".getBytes(StandardCharsets.UTF_8))
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8)))
                 .setTerminating(true), true).build();
 
         response = clientMetricsManager.processPushTelemetryRequest(
@@ -810,7 +811,7 @@ public class ClientMetricsManagerTest {
         PushTelemetryRequest request = new PushTelemetryRequest.Builder(
             new PushTelemetryRequestData()
                 
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
-                .setMetrics("test-data".getBytes(StandardCharsets.UTF_8))
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8)))
                 .setSubscriptionId(1234), true).build();
 
         PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
@@ -908,7 +909,7 @@ public class ClientMetricsManagerTest {
                     new PushTelemetryRequestData()
                             
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                             
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
-                            .setMetrics(metrics), true).build();
+                            .setMetrics(ByteBuffer.wrap(metrics)), 
true).build();
 
             // Set the max bytes 1 to force the error.
             PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
@@ -937,7 +938,7 @@ public class ClientMetricsManagerTest {
                 
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                 
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
                 .setCompressionType(CompressionType.NONE.id)
-                .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), 
true).build();
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
         CountDownLatch lock = new CountDownLatch(2);
         List<PushTelemetryResponse> responses = 
Collections.synchronizedList(new ArrayList<>());
@@ -1013,7 +1014,7 @@ public class ClientMetricsManagerTest {
                 
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                 
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
                 .setCompressionType(CompressionType.NONE.id)
-                .setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), 
true).build();
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
         clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
         assertEquals(1, clientMetricsManager.subscriptions().size());
@@ -1103,7 +1104,7 @@ public class ClientMetricsManagerTest {
                             
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
                             
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
                             .setCompressionType(CompressionType.NONE.id)
-                            
.setMetrics("test-data".getBytes(StandardCharsets.UTF_8)), true).build();
+                            
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
 
             PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
                     request, ClientMetricsTestUtils.requestContext());
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
index 35f9a41b47d..698e8720954 100644
--- 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -53,7 +54,7 @@ public class ClientMetricsReceiverPluginTest {
 
         byte[] metrics = "test-metrics".getBytes(StandardCharsets.UTF_8);
         
clientMetricsReceiverPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
-            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(metrics), true).build());
+            new PushTelemetryRequest.Builder(new 
PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build());
 
         assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
         assertEquals(1, telemetryReceiver.metricsData.size());

Reply via email to