This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new a2e133df5a3 KAFKA-16759: Handle telemetry push response while 
terminating (#15957)
a2e133df5a3 is described below

commit a2e133df5a364ad47b640190dd2a5622ed19315c
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed May 15 17:10:34 2024 +0100

    KAFKA-16759: Handle telemetry push response while terminating (#15957)
    
    When client telemetry is configured in a cluster, Kafka producers and 
consumers push metrics to the brokers periodically. There is a special push of 
metrics that occurs when the client is terminating. A state machine in the 
client telemetry reporter controls its behaviour in different states.
    
    Sometimes, when a client was terminating, it was attempting an invalid 
state transition from TERMINATING_PUSH_IN_PROGRESS to PUSH_NEEDED when it 
receives a response to a PushTelemetry RPC. This was essentially harmless 
because the state transition did not occur but it did cause unsightly log lines 
to be generated. This PR performs a check for the terminating states when 
receiving the response and simply remains in the current state.
    
    I added a test to validate the state management in this case. Actually, the 
test passes before the code change in the PR, but with unsightly log lines.
    
    
    Reviewers: Manikumar Reddy <[email protected]>,  Apoorv Mittal 
<[email protected]>
---
 .../telemetry/internals/ClientTelemetryReporter.java  | 16 ++++++++--------
 .../internals/ClientTelemetryReporterTest.java        | 19 +++++++++++++++++++
 2 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index ed6f27f4f0b..8f40adf1a7a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -489,6 +489,14 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
 
             lock.writeLock().lock();
             try {
+                /*
+                 This is the case when client began termination sometime after 
the last push request
+                 was issued. Just getting the callback, hence need to ignore 
it.
+                */
+                if (isTerminatingState()) {
+                    return;
+                }
+
                 Optional<Integer> errorIntervalMsOpt = 
ClientTelemetryUtils.maybeFetchErrorIntervalMs(data.errorCode(),
                     subscription.pushIntervalMs());
                 /*
@@ -497,14 +505,6 @@ public class ClientTelemetryReporter implements 
MetricsReporter {
                  and the push retried.
                 */
                 if (errorIntervalMsOpt.isPresent()) {
-                    /*
-                     This is the case when client began termination sometime 
after the last push request
-                     was issued. Just getting the callback, hence need to 
ignore it.
-                    */
-                    if (isTerminatingState()) {
-                        return;
-                    }
-
                     if 
(!maybeSetState(ClientTelemetryState.SUBSCRIPTION_NEEDED)) {
                         log.warn("Unable to transition state after failed push 
telemetry from state {}", state);
                     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
index c2ea26ecf37..90e06fe99a7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.java
@@ -502,6 +502,25 @@ public class ClientTelemetryReporterTest {
         assertTrue(telemetrySender.enabled());
     }
 
+    @Test
+    public void testHandleResponsePushTelemetryTerminating() {
+        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();
+        telemetrySender.updateSubscriptionResult(subscription, 
time.milliseconds());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
+
+        PushTelemetryResponse response = new PushTelemetryResponse(new 
PushTelemetryResponseData());
+
+        telemetrySender.handleResponse(response);
+        // The telemetry sender remains in TERMINATING_PUSH_IN_PROGRESS so 
that a subsequent close() finishes the job
+        assertEquals(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS, 
telemetrySender.state());
+        assertEquals(subscription.pushIntervalMs(), 
telemetrySender.intervalMs());
+        assertTrue(telemetrySender.enabled());
+        
assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
+    }
+
     @Test
     public void testHandleResponsePushTelemetryErrorResponse() {
         ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = 
(ClientTelemetryReporter.DefaultClientTelemetrySender) 
clientTelemetryReporter.telemetrySender();

Reply via email to