This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 4c2d86f4114 MINOR: Fixing client telemetry validate request (#19959)
4c2d86f4114 is described below

commit 4c2d86f411440aeaebcf6c1755bdeb690dfdb35a
Author: Apoorv Mittal <apoorvmitta...@gmail.com>
AuthorDate: Thu Jun 12 22:52:50 2025 +0100

    MINOR: Fixing client telemetry validate request (#19959)
    
    Minor fix to correct the validate condition for GetTelemetryRequests.
    Added respective tests as well.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../apache/kafka/server/ClientMetricsManager.java  |   2 +-
 .../kafka/server/ClientMetricsManagerTest.java     | 110 ++++++++++++++++++++-
 2 files changed, 110 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java 
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index b143466574c..d7343098b4e 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -391,7 +391,7 @@ public class ClientMetricsManager implements AutoCloseable {
         ClientMetricsInstance clientInstance, long timestamp) {
 
         if (!clientInstance.maybeUpdateGetRequestTimestamp(timestamp) && 
(clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID
-            || clientInstance.lastKnownError() != 
Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
+            && clientInstance.lastKnownError() != 
Errors.UNSUPPORTED_COMPRESSION_TYPE)) {
             
clientMetricsStats.recordThrottleCount(clientInstance.clientInstanceId());
             String msg = String.format("Request from the client [%s] arrived 
before the next push interval time",
                 request.data().clientInstanceId());
diff --git 
a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index 3b0bd181b91..c5b13eefbe4 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -826,7 +826,6 @@ public class ClientMetricsManagerTest {
         assertEquals((double) 0, 
getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_ERROR + 
"-count").metricValue());
         assertEquals(Double.NaN, 
getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"-avg").metricValue());
         assertEquals(Double.NaN, 
getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"-max").metricValue());
-
     }
 
     @Test
@@ -1124,6 +1123,115 @@ public class ClientMetricsManagerTest {
         }
     }
 
+    @Test
+    public void 
testGetTelemetrySubscriptionAfterPushTelemetryUnknownSubscriptionSucceeds() 
throws Exception {
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+
+        GetTelemetrySubscriptionsRequest subscriptionsRequest = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData(), true).build();
+
+        GetTelemetrySubscriptionsResponse subscriptionsResponse = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+
+        Properties properties = new Properties();
+        properties.put("interval.ms", "100");
+        clientMetricsManager.updateSubscription("sub-2", properties);
+        assertEquals(2, clientMetricsManager.subscriptions().size());
+
+        PushTelemetryRequest request = new Builder(
+            new PushTelemetryRequestData()
+                
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
+                
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
+                .setCompressionType(CompressionType.NONE.id)
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
+
+        PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
+            request, ClientMetricsTestUtils.requestContext());
+
+        assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, response.error());
+        ClientMetricsInstance instance = 
clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
+        assertNotNull(instance);
+        assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, 
instance.lastKnownError());
+
+        subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
+            new 
GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()),
 true).build();
+        subscriptionsResponse = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+        assertEquals(Errors.NONE, subscriptionsResponse.error());
+    }
+
+    @Test
+    public void 
testGetTelemetrySubscriptionAfterPushTelemetryUnknownCompressionSucceeds() 
throws Exception {
+        clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultTestProperties());
+        assertEquals(1, clientMetricsManager.subscriptions().size());
+
+        GetTelemetrySubscriptionsRequest subscriptionsRequest = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData(), true).build();
+
+        GetTelemetrySubscriptionsResponse subscriptionsResponse = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+
+        PushTelemetryRequest request = new Builder(
+            new PushTelemetryRequestData()
+                
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
+                
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
+                .setCompressionType((byte) 10) // // Invalid compression type
+                
.setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), 
true).build();
+
+        PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
+            request, ClientMetricsTestUtils.requestContext());
+
+        assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, response.error());
+        ClientMetricsInstance instance = 
clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
+        assertNotNull(instance);
+        assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, 
instance.lastKnownError());
+
+        subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
+            new 
GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()),
 true).build();
+        subscriptionsResponse = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+        assertEquals(Errors.NONE, subscriptionsResponse.error());
+    }
+
+    @Test
+    public void 
testGetTelemetrySubscriptionAfterPushTelemetryBytesExceptionFails() throws 
Exception {
+        try (
+            Metrics kafkaMetrics = new Metrics();
+            ClientMetricsManager clientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 1, time, kafkaMetrics)
+        ) {
+            GetTelemetrySubscriptionsRequest subscriptionsRequest = new 
GetTelemetrySubscriptionsRequest.Builder(
+                new GetTelemetrySubscriptionsRequestData(), true).build();
+
+            GetTelemetrySubscriptionsResponse subscriptionsResponse = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+                subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+
+            byte[] metrics = "ab".getBytes(StandardCharsets.UTF_8);
+            assertEquals(2, metrics.length);
+
+            PushTelemetryRequest request = new PushTelemetryRequest.Builder(
+                new PushTelemetryRequestData()
+                    
.setClientInstanceId(subscriptionsResponse.data().clientInstanceId())
+                    
.setSubscriptionId(subscriptionsResponse.data().subscriptionId())
+                    .setMetrics(ByteBuffer.wrap(metrics)), true).build();
+
+            // Set the max bytes 1 to force the error.
+            PushTelemetryResponse response = 
clientMetricsManager.processPushTelemetryRequest(
+                request, ClientMetricsTestUtils.requestContext());
+
+            assertEquals(Errors.TELEMETRY_TOO_LARGE, response.error());
+            ClientMetricsInstance instance = 
clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId());
+            assertNotNull(instance);
+            assertEquals(Errors.TELEMETRY_TOO_LARGE, 
instance.lastKnownError());
+
+            subscriptionsRequest = new 
GetTelemetrySubscriptionsRequest.Builder(
+                new 
GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()),
 true).build();
+            subscriptionsResponse = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+                subscriptionsRequest, ClientMetricsTestUtils.requestContext());
+            assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, 
subscriptionsResponse.error());
+        }
+    }
+
     @Test
     public void testCacheEviction() throws Exception {
         Properties properties = new Properties();

Reply via email to