This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new a2e133df5a3 KAFKA-16759: Handle telemetry push response while
terminating (#15957)
a2e133df5a3 is described below
commit a2e133df5a364ad47b640190dd2a5622ed19315c
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed May 15 17:10:34 2024 +0100
KAFKA-16759: Handle telemetry push response while terminating (#15957)
When client telemetry is configured in a cluster, Kafka producers and
consumers push metrics to the brokers periodically. There is a special push of
metrics that occurs when the client is terminating. A state machine in the
client telemetry reporter controls its behaviour in different states.
Sometimes, when a client was terminating, it was attempting an invalid
state transition from TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED when it
receives a response to a PushTelemetry RPC. This was essentially harmless
because the state transition did not occur but it did cause unsightly log lines
to be generated. This PR performs a check for the terminating states when
receiving the response and simply remains in the current state.
I added a test to validate the state management in this case. Actually, the
test passes before the code change in the PR, but with unsightly log lines.
Reviewers: Manikumar Reddy <[email protected]>, Apoorv Mittal
<[email protected]>
---
.../telemetry/internals/ClientTelemetryReporter.java | 16 ++++++++--------
.../internals/ClientTelemetryReporterTest.java | 19 +++++++++++++++++++
2 files changed, 27 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index ed6f27f4f0b..8f40adf1a7a 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -489,6 +489,14 @@ public class ClientTelemetryReporter implements
MetricsReporter {
lock.writeLock().lock();
try {
+ /*
+ This is the case when client began termination sometime after
the last push request
+ was issued. Just getting the callback, hence need to ignore
it.
+ */
+ if (isTerminatingState()) {
+ return;
+ }
+
Optional<Integer> errorIntervalMsOpt =
ClientTelemetryUtils.maybeFetchErrorIntervalMs(data.errorCode(),
subscription.pushIntervalMs());
/*
@@ -497,14 +505,6 @@ public class ClientTelemetryReporter implements
MetricsReporter {
and the push retried.
*/
if (errorIntervalMsOpt.isPresent()) {
- /*
- This is the case when client began termination sometime
after the last push request
- was issued. Just getting the callback, hence need to
ignore it.
- */
- if (isTerminatingState()) {
- return;
- }
-
if
(!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
log.warn("Unable to transition state after failed push
telemetry from state {}", state);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index c2ea26ecf37..90e06fe99a7 100644
---
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -502,6 +502,25 @@ public class ClientTelemetryReporterTest {
assertTrue(telemetrySender.enabled());
}
+ @Test
+ public void testHandleResponsePushTelemetryTerminating() {
+ ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();
+ telemetrySender.updateSubscriptionResult(subscription,
time.milliseconds());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
+
+ PushTelemetryResponse response = new PushTelemetryResponse(new
PushTelemetryResponseData());
+
+ telemetrySender.handleResponse(response);
+ // The telemetry sender remains in TERMINATING_PUSH_IN_PROGRESS so
that a subsequent close() finishes the job
+ assertEquals(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS,
telemetrySender.state());
+ assertEquals(subscription.pushIntervalMs(),
telemetrySender.intervalMs());
+ assertTrue(telemetrySender.enabled());
+
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
+ }
+
@Test
public void testHandleResponsePushTelemetryErrorResponse() {
ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender =
(ClientTelemetryReporter.DefaultClientTelemetrySender)
clientTelemetryReporter.telemetrySender();