This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push: new 4c2d86f4114 MINOR: Fixing client telemetry validate request (#19959) 4c2d86f4114 is described below commit 4c2d86f411440aeaebcf6c1755bdeb690dfdb35a Author: Apoorv Mittal <apoorvmitta...@gmail.com> AuthorDate: Thu Jun 12 22:52:50 2025 +0100 MINOR: Fixing client telemetry validate request (#19959) Minor fix to correct the validate condition for GetTelemetryRequests. Added respective tests as well. Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../apache/kafka/server/ClientMetricsManager.java | 2 +- .../kafka/server/ClientMetricsManagerTest.java | 110 ++++++++++++++++++++- 2 files changed, 110 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java index b143466574c..d7343098b4e 100644 --- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java +++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java @@ -391,7 +391,7 @@ public class ClientMetricsManager implements AutoCloseable { ClientMetricsInstance clientInstance, long timestamp) { if (!clientInstance.maybeUpdateGetRequestTimestamp(timestamp) && (clientInstance.lastKnownError() != Errors.UNKNOWN_SUBSCRIPTION_ID - || clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) { + && clientInstance.lastKnownError() != Errors.UNSUPPORTED_COMPRESSION_TYPE)) { clientMetricsStats.recordThrottleCount(clientInstance.clientInstanceId()); String msg = String.format("Request from the client [%s] arrived before the next push interval time", request.data().clientInstanceId()); diff --git a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java index 3b0bd181b91..c5b13eefbe4 100644 --- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java @@ -826,7 +826,6 @@ public class ClientMetricsManagerTest { assertEquals((double) 0, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_ERROR + "-count").metricValue()); assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-avg").metricValue()); assertEquals(Double.NaN, getMetric(ClientMetricsManager.ClientMetricsStats.PLUGIN_EXPORT_TIME + "-max").metricValue()); - } @Test @@ -1124,6 +1123,115 @@ public class ClientMetricsManagerTest { } } + @Test + public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownSubscriptionSucceeds() throws Exception { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultTestProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + Properties properties = new Properties(); + properties.put("interval.ms", "100"); + clientMetricsManager.updateSubscription("sub-2", properties); + assertEquals(2, clientMetricsManager.subscriptions().size()); + + PushTelemetryRequest request = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType(CompressionType.NONE.id) + .setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, response.error()); + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.UNKNOWN_SUBSCRIPTION_ID, instance.lastKnownError()); + + subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build(); + subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + assertEquals(Errors.NONE, subscriptionsResponse.error()); + } + + @Test + public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownCompressionSucceeds() throws Exception { + clientMetricsManager.updateSubscription("sub-1", ClientMetricsTestUtils.defaultTestProperties()); + assertEquals(1, clientMetricsManager.subscriptions().size()); + + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + PushTelemetryRequest request = new Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setCompressionType((byte) 10) // // Invalid compression type + .setMetrics(ByteBuffer.wrap("test-data".getBytes(StandardCharsets.UTF_8))), true).build(); + + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, response.error()); + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, instance.lastKnownError()); + + subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build(); + subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + assertEquals(Errors.NONE, subscriptionsResponse.error()); + } + + @Test + public void testGetTelemetrySubscriptionAfterPushTelemetryBytesExceptionFails() throws Exception { + try ( + Metrics kafkaMetrics = new Metrics(); + ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 1, time, kafkaMetrics) + ) { + GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData(), true).build(); + + GetTelemetrySubscriptionsResponse subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + + byte[] metrics = "ab".getBytes(StandardCharsets.UTF_8); + assertEquals(2, metrics.length); + + PushTelemetryRequest request = new PushTelemetryRequest.Builder( + new PushTelemetryRequestData() + .setClientInstanceId(subscriptionsResponse.data().clientInstanceId()) + .setSubscriptionId(subscriptionsResponse.data().subscriptionId()) + .setMetrics(ByteBuffer.wrap(metrics)), true).build(); + + // Set the max bytes 1 to force the error. + PushTelemetryResponse response = clientMetricsManager.processPushTelemetryRequest( + request, ClientMetricsTestUtils.requestContext()); + + assertEquals(Errors.TELEMETRY_TOO_LARGE, response.error()); + ClientMetricsInstance instance = clientMetricsManager.clientInstance(subscriptionsResponse.data().clientInstanceId()); + assertNotNull(instance); + assertEquals(Errors.TELEMETRY_TOO_LARGE, instance.lastKnownError()); + + subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder( + new GetTelemetrySubscriptionsRequestData().setClientInstanceId(subscriptionsResponse.data().clientInstanceId()), true).build(); + subscriptionsResponse = clientMetricsManager.processGetTelemetrySubscriptionRequest( + subscriptionsRequest, ClientMetricsTestUtils.requestContext()); + assertEquals(Errors.THROTTLING_QUOTA_EXCEEDED, subscriptionsResponse.error()); + } + } + @Test public void testCacheEviction() throws Exception { Properties properties = new Properties();