This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new aceb32d15e6 KAFKA-19747: Update ClientTelemetryReporter telemetry push 
error handling (#20661)
aceb32d15e6 is described below

commit aceb32d15e621c33df7c45220d12520c5416596a
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Oct 9 16:26:39 2025 -0400

    KAFKA-19747: Update ClientTelemetryReporter telemetry push error handling 
(#20661)
    
    When a failure occurs with a push telemetry request, any exception is
    treated as fatal, increasing the time interval to `Integer.MAX_VALUE`
    effectively turning telemetry off.  This PR updates the error handling
    to check if the exception is a transient one with expected recovery and
    keeps the telemetry interval value the same in those cases since a
    recovery is expected.
    
    Reviewers: Apoorv Mittal <[email protected]>, Matthias
     Sax<[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../internals/ClientTelemetryReporter.java         |  11 +-
 .../internals/ClientTelemetryReporterTest.java     | 163 +++++++++++++++++++++
 3 files changed, 173 insertions(+), 3 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ca1c3d65763..7c02cf3adf9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -87,7 +87,7 @@
               files="ClientUtils.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest).java"/>
+              
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest|ClientTelemetryReporterTest).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
               
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index e0491943fef..ae60aef66c9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
 import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
 import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -526,13 +527,13 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
         @Override
         public void 
handleFailedGetTelemetrySubscriptionsRequest(KafkaException 
maybeFatalException) {
             log.debug("The broker generated an error for the get telemetry 
network API request", maybeFatalException);
-            handleFailedRequest(maybeFatalException != null);
+            handleFailedRequest(isRetryable(maybeFatalException));
         }
 
         @Override
         public void handleFailedPushTelemetryRequest(KafkaException 
maybeFatalException) {
             log.debug("The broker generated an error for the push telemetry 
network API request", maybeFatalException);
-            handleFailedRequest(maybeFatalException != null);
+            handleFailedRequest(isRetryable(maybeFatalException));
         }
 
         @Override
@@ -626,6 +627,12 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
             }
         }
 
+        private boolean isRetryable(final KafkaException maybeFatalException) {
+            return maybeFatalException == null ||
+                (maybeFatalException instanceof RetriableException) ||
+                (maybeFatalException.getCause() != null && 
maybeFatalException.getCause() instanceof RetriableException);
+        }
+
         private Optional<Builder<?>> 
createSubscriptionRequest(ClientTelemetrySubscription localSubscription) {
             /*
              If we've previously retrieved a subscription, it will contain the 
client instance ID
diff --git 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index b708b4eeb60..003061563b7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -19,7 +19,13 @@ package org.apache.kafka.common.telemetry.internals;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
 import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
 import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -770,6 +776,163 @@ public class ClientTelemetryReporterTest {
             .telemetrySender()).state());
     }
 
+    @Test
+    public void 
testHandleFailedGetTelemetrySubscriptionsRequestWithRetriableException() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+        KafkaException retriableException = new TimeoutException("Request 
timed out");
+        
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(retriableException);
+
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, 
telemetrySender.intervalMs());
+        assertTrue(telemetrySender.enabled());
+    }
+
+    @Test
+    public void 
testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedRetriableException() 
{
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+        KafkaException wrappedException = new KafkaException(new 
DisconnectException("Connection lost"));
+        
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
+
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, 
telemetrySender.intervalMs());
+        assertTrue(telemetrySender.enabled());
+    }
+
+    @Test
+    public void 
testHandleFailedGetTelemetrySubscriptionsRequestWithFatalException() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+        KafkaException fatalException = new AuthorizationException("Not 
authorized for telemetry");
+        
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(fatalException);
+
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+        assertFalse(telemetrySender.enabled());
+    }
+
+    @Test
+    public void 
testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedFatalException() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+        KafkaException wrappedException = new KafkaException("Version check 
failed", 
+            new UnsupportedVersionException("Broker doesn't support 
telemetry"));
+        
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
+
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+        assertFalse(telemetrySender.enabled());
+    }
+
+    @Test
+    public void testHandleFailedPushTelemetryRequestWithRetriableException() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+        KafkaException networkException = new NetworkException("Network 
failure");
+        telemetrySender.handleFailedPushTelemetryRequest(networkException);
+
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, 
telemetrySender.intervalMs());
+        assertTrue(telemetrySender.enabled());
+    }
+
+    @Test
+    public void testHandleFailedPushTelemetryRequestWithFatalException() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+        KafkaException authException = new AuthorizationException("Not 
authorized to push telemetry");
+        telemetrySender.handleFailedPushTelemetryRequest(authException);
+
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+        assertFalse(telemetrySender.enabled());
+    }
+
+    @Test
+    public void 
testHandleFailedRequestWithMultipleRetriableExceptionsInChain() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+        KafkaException chainedException = new TimeoutException("Outer timeout",
+            new DisconnectException("Inner disconnect"));
+        
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(chainedException);
+
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS, 
telemetrySender.intervalMs());
+        assertTrue(telemetrySender.enabled());
+    }
+    
+    @Test
+    public void testHandleFailedRequestWithGenericKafkaException() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+        KafkaException genericException = new KafkaException("Unknown error");
+        
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(genericException);
+
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+        assertFalse(telemetrySender.enabled());
+    }
+
+    @Test
+    public void testHandleFailedRequestDuringTermination() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+
+        KafkaException exception = new TimeoutException("Timeout");
+        telemetrySender.handleFailedPushTelemetryRequest(exception);
+
+        assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED, 
telemetrySender.state());
+        assertTrue(telemetrySender.enabled());
+    }
+
+    @Test
+    public void testSequentialFailuresWithDifferentExceptionTypes() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+            new TimeoutException("Timeout 1"));
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertTrue(telemetrySender.enabled());
+
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+            new DisconnectException("Disconnect"));
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertTrue(telemetrySender.enabled());
+
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+            new UnsupportedVersionException("Version not supported"));
+        assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, 
telemetrySender.state());
+        assertFalse(telemetrySender.enabled());
+    }
+
     @AfterEach
     public void tearDown() {
         clientTelemetryReporter.close();

Reply via email to