This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new aceb32d15e6 KAFKA-19747: Update ClientTelemetryReporter telemetry push
error handling (#20661)
aceb32d15e6 is described below
commit aceb32d15e621c33df7c45220d12520c5416596a
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Oct 9 16:26:39 2025 -0400
KAFKA-19747: Update ClientTelemetryReporter telemetry push error handling
(#20661)
When a failure occurs with a push telemetry request, any exception is
treated as fatal, increasing the time interval to `Integer.MAX_VALUE`
effectively turning telemetry off. This PR updates the error handling
to check if the exception is a transient one with expected recovery and
keeps the telemetry interval value the same in those cases since a
recovery is expected.
Reviewers: Apoorv Mittal <[email protected]>, Matthias
Sax<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../internals/ClientTelemetryReporter.java | 11 +-
.../internals/ClientTelemetryReporterTest.java | 163 +++++++++++++++++++++
3 files changed, 173 insertions(+), 3 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ca1c3d65763..7c02cf3adf9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -87,7 +87,7 @@
files="ClientUtils.java"/>
<suppress checks="ClassDataAbstractionCoupling"
-
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest).java"/>
+
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest|ClientTelemetryReporterTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index e0491943fef..ae60aef66c9 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -526,13 +527,13 @@ public class ClientTelemetryReporter implements
MetricsReporter {
@Override
public void
handleFailedGetTelemetrySubscriptionsRequest(KafkaException
maybeFatalException) {
log.debug("The broker generated an error for the get telemetry
network API request", maybeFatalException);
- handleFailedRequest(maybeFatalException != null);
+ handleFailedRequest(isRetryable(maybeFatalException));
}
@Override
public void handleFailedPushTelemetryRequest(KafkaException
maybeFatalException) {
log.debug("The broker generated an error for the push telemetry
network API request", maybeFatalException);
- handleFailedRequest(maybeFatalException != null);
+ handleFailedRequest(isRetryable(maybeFatalException));
}
@Override
@@ -626,6 +627,12 @@ public class ClientTelemetryReporter implements
MetricsReporter {
}
}
+ private boolean isRetryable(final KafkaException maybeFatalException) {
+ return maybeFatalException == null ||
+ (maybeFatalException instanceof RetriableException) ||
+ (maybeFatalException.getCause() != null &&
maybeFatalException.getCause() instanceof RetriableException);
+ }
+
private Optional<Builder<?>>
createSubscriptionRequest(ClientTelemetrySubscription localSubscription) {
/*
If we've previously retrieved a subscription, it will contain the
client instance ID
diff --git
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index b708b4eeb60..003061563b7 100644
---
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -19,7 +19,13 @@ package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.NetworkException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -770,6 +776,163 @@ public class ClientTelemetryReporterTest {
.telemetrySender()).state());
}
+ @Test
+ public void
testHandleFailedGetTelemetrySubscriptionsRequestWithRetriableException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException retriableException = new TimeoutException("Request
timed out");
+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(retriableException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS,
telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void
testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedRetriableException()
{
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException wrappedException = new KafkaException(new
DisconnectException("Connection lost"));
+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS,
telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void
testHandleFailedGetTelemetrySubscriptionsRequestWithFatalException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException fatalException = new AuthorizationException("Not
authorized for telemetry");
+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(fatalException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void
testHandleFailedGetTelemetrySubscriptionsRequestWithWrappedFatalException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException wrappedException = new KafkaException("Version check
failed",
+ new UnsupportedVersionException("Broker doesn't support
telemetry"));
+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(wrappedException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedPushTelemetryRequestWithRetriableException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ KafkaException networkException = new NetworkException("Network
failure");
+ telemetrySender.handleFailedPushTelemetryRequest(networkException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS,
telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedPushTelemetryRequestWithFatalException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
+
+ KafkaException authException = new AuthorizationException("Not
authorized to push telemetry");
+ telemetrySender.handleFailedPushTelemetryRequest(authException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void
testHandleFailedRequestWithMultipleRetriableExceptionsInChain() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException chainedException = new TimeoutException("Outer timeout",
+ new DisconnectException("Inner disconnect"));
+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(chainedException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertEquals(ClientTelemetryReporter.DEFAULT_PUSH_INTERVAL_MS,
telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedRequestWithGenericKafkaException() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
+ KafkaException genericException = new KafkaException("Unknown error");
+
telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(genericException);
+
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertEquals(Integer.MAX_VALUE, telemetrySender.intervalMs());
+ assertFalse(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testHandleFailedRequestDuringTermination() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+
+ KafkaException exception = new TimeoutException("Timeout");
+ telemetrySender.handleFailedPushTelemetryRequest(exception);
+
+ assertEquals(ClientTelemetryState.TERMINATING_PUSH_NEEDED,
telemetrySender.state());
+ assertTrue(telemetrySender.enabled());
+ }
+
+ @Test
+ public void testSequentialFailuresWithDifferentExceptionTypes() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+ new TimeoutException("Timeout 1"));
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertTrue(telemetrySender.enabled());
+
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+ new DisconnectException("Disconnect"));
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertTrue(telemetrySender.enabled());
+
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+ telemetrySender.handleFailedGetTelemetrySubscriptionsRequest(
+ new UnsupportedVersionException("Version not supported"));
+ assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED,
telemetrySender.state());
+ assertFalse(telemetrySender.enabled());
+ }
+
@AfterEach
public void tearDown() {
clientTelemetryReporter.close();